#!/usr/bin/env python3 import sys import traceback import argparse from pyModbusTCP.server import ModbusServer, DataBank, DataHandler from time import sleep, time FORCED_OFF = 20210 RESET = 20001 FULL_STATUS = 19000 SELF_STATUS = 19005 TIME_OF_DAY = 20200 PHOTOCELL = 20202 DIMMING = 20203 class MaHandler(DataHandler): def __init__(self, data_bank): super().__init__(data_bank=data_bank) data_bank.last_forcedoff_write = 0 def read_coils(self, address, count, srv_info): ret = super().read_coils(address, count, srv_info) return ret def read_d_inputs(self, address, count, srv_info): ret = super().read_d_inputs(address, count, srv_info) return ret def read_h_regs(self, address, count, srv_info): ret = super().read_h_regs(address, count, srv_info) return ret def read_i_regs(self, address, count, srv_info): ret = super().read_i_regs(address, count, srv_info) return ret def write_coils(self, address, bits_l, srv_info): ret = super().write_coils(address, bits_l, srv_info) if address == RESET and bits_l[0]: instance.data_bank.set_holding_registers(FORCED_OFF, [0]) instance.data_bank.set_input_registers(FULL_STATUS, [4]) instance.data_bank.set_input_registers(SELF_STATUS, [4]) instance.data_bank.set_input_registers(TIME_OF_DAY, [255]) instance.data_bank.set_input_registers(PHOTOCELL, [255]) instance.data_bank.set_holding_registers(DIMMING, [0]) instance.data_bank.set_coils(RESET, [False]) else: pass return ret def write_h_regs(self, address, words_l, srv_info): ret = super().write_h_regs(address, words_l, srv_info) if address == DIMMING: print(f"write Dimming {address=} -> {words_l[0]}") if words_l[0] in (0, 128): self.data_bank.set_input_registers(DIMMING, [0]) elif words_l[0] == 129: self.data_bank.set_input_registers(DIMMING, [1]) elif words_l[0] == 130: self.data_bank.set_input_registers(DIMMING, [2]) else: pass elif address == TIME_OF_DAY: print(f"write ToD {address=} -> {words_l[0]}") if words_l[0] in (0, 128): self.data_bank.set_input_registers(TIME_OF_DAY, [0]) elif words_l[0] == 129: self.data_bank.set_input_registers(TIME_OF_DAY, [1]) elif words_l[0] == 130: self.data_bank.set_input_registers(TIME_OF_DAY, [2]) elif words_l[0] == 131: self.data_bank.set_input_registers(TIME_OF_DAY, [3]) elif words_l[0] == 132: self.data_bank.set_input_registers(TIME_OF_DAY, [4]) else: pass elif address == FORCED_OFF: print(f"write Forced off {address=} -> {words_l[0]}") self.data_bank.last_forcedoff_write = time() else: pass return ret if __name__ == "__main__": start_time = time() parser = argparse.ArgumentParser() parser.add_argument( "-H", "--host", type=str, default="localhost", help="Host (default: localhost)" ) parser.add_argument( "-p", "--port", type=int, default=11800, help="TCP port (default: 11800)" ) parser.add_argument( "-d", "--daemon", action="store_true", help="Run as daemon with no 15 min timeout", ) args = parser.parse_args() try: instance = ModbusServer( host=args.host, port=args.port, no_block=True, data_hdl=MaHandler(data_bank=DataBank()), ) instance.start() while True: if ( not args.daemon and time() - start_time > 60 * 15 and time() - instance.data_bank.last_forcedoff_write > 60 * 2 ): print("input timeout. stopping device") break if time() - instance.data_bank.last_forcedoff_write > 1: instance.data_bank.set_holding_registers(FORCED_OFF, [0]) instance.data_bank.set_input_registers(FULL_STATUS, [1]) instance.data_bank.set_input_registers(SELF_STATUS, [1]) instance.data_bank.set_input_registers(TIME_OF_DAY, [3]) instance.data_bank.set_input_registers(PHOTOCELL, [3]) instance.data_bank.set_holding_registers(DIMMING, [0]) sleep(1) except Exception as ex: instance.stop() exc = sys.exception() traceback.print_tb(exc.__traceback__, limit=3, file=sys.stdout) print(f"An epic failure just has happened! ({ex})") else: instance.stop() print("bye")