136 lines
4.8 KiB
Python
136 lines
4.8 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import sys
|
||
|
import traceback
|
||
|
import argparse
|
||
|
from pyModbusTCP.server import ModbusServer, DataBank, DataHandler
|
||
|
from time import sleep, time
|
||
|
|
||
|
FORCED_OFF = 20210
|
||
|
RESET = 20001
|
||
|
FULL_STATUS = 19000
|
||
|
SELF_STATUS = 19005
|
||
|
TIME_OF_DAY = 20200
|
||
|
PHOTOCELL = 20202
|
||
|
DIMMING = 20203
|
||
|
|
||
|
|
||
|
class MaHandler(DataHandler):
|
||
|
def __init__(self, data_bank):
|
||
|
super().__init__(data_bank=data_bank)
|
||
|
data_bank.last_forcedoff_write = 0
|
||
|
|
||
|
def read_coils(self, address, count, srv_info):
|
||
|
ret = super().read_coils(address, count, srv_info)
|
||
|
return ret
|
||
|
|
||
|
def read_d_inputs(self, address, count, srv_info):
|
||
|
ret = super().read_d_inputs(address, count, srv_info)
|
||
|
return ret
|
||
|
|
||
|
def read_h_regs(self, address, count, srv_info):
|
||
|
ret = super().read_h_regs(address, count, srv_info)
|
||
|
return ret
|
||
|
|
||
|
def read_i_regs(self, address, count, srv_info):
|
||
|
ret = super().read_i_regs(address, count, srv_info)
|
||
|
return ret
|
||
|
|
||
|
def write_coils(self, address, bits_l, srv_info):
|
||
|
ret = super().write_coils(address, bits_l, srv_info)
|
||
|
if address == RESET and bits_l[0]:
|
||
|
instance.data_bank.set_holding_registers(FORCED_OFF, [0])
|
||
|
instance.data_bank.set_input_registers(FULL_STATUS, [4])
|
||
|
instance.data_bank.set_input_registers(SELF_STATUS, [4])
|
||
|
instance.data_bank.set_input_registers(TIME_OF_DAY, [255])
|
||
|
instance.data_bank.set_input_registers(PHOTOCELL, [255])
|
||
|
instance.data_bank.set_holding_registers(DIMMING, [0])
|
||
|
instance.data_bank.set_coils(RESET, [False])
|
||
|
else:
|
||
|
pass
|
||
|
return ret
|
||
|
|
||
|
def write_h_regs(self, address, words_l, srv_info):
|
||
|
ret = super().write_h_regs(address, words_l, srv_info)
|
||
|
if address == DIMMING:
|
||
|
print(f"write Dimming {address=} -> {words_l[0]}")
|
||
|
if words_l[0] in (0, 128):
|
||
|
self.data_bank.set_input_registers(DIMMING, [0])
|
||
|
elif words_l[0] == 129:
|
||
|
self.data_bank.set_input_registers(DIMMING, [1])
|
||
|
elif words_l[0] == 130:
|
||
|
self.data_bank.set_input_registers(DIMMING, [2])
|
||
|
else:
|
||
|
pass
|
||
|
elif address == TIME_OF_DAY:
|
||
|
print(f"write ToD {address=} -> {words_l[0]}")
|
||
|
if words_l[0] in (0, 128):
|
||
|
self.data_bank.set_input_registers(TIME_OF_DAY, [0])
|
||
|
elif words_l[0] == 129:
|
||
|
self.data_bank.set_input_registers(TIME_OF_DAY, [1])
|
||
|
elif words_l[0] == 130:
|
||
|
self.data_bank.set_input_registers(TIME_OF_DAY, [2])
|
||
|
elif words_l[0] == 131:
|
||
|
self.data_bank.set_input_registers(TIME_OF_DAY, [3])
|
||
|
elif words_l[0] == 132:
|
||
|
self.data_bank.set_input_registers(TIME_OF_DAY, [4])
|
||
|
else:
|
||
|
pass
|
||
|
elif address == FORCED_OFF:
|
||
|
print(f"write Forced off {address=} -> {words_l[0]}")
|
||
|
self.data_bank.last_forcedoff_write = time()
|
||
|
else:
|
||
|
pass
|
||
|
return ret
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
start_time = time()
|
||
|
parser = argparse.ArgumentParser()
|
||
|
parser.add_argument(
|
||
|
"-H", "--host", type=str, default="localhost", help="Host (default: localhost)"
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
"-p", "--port", type=int, default=11800, help="TCP port (default: 11800)"
|
||
|
)
|
||
|
parser.add_argument(
|
||
|
"-d",
|
||
|
"--daemon",
|
||
|
action="store_true",
|
||
|
help="Run as daemon with no 15 min timeout",
|
||
|
)
|
||
|
args = parser.parse_args()
|
||
|
try:
|
||
|
instance = ModbusServer(
|
||
|
host=args.host,
|
||
|
port=args.port,
|
||
|
no_block=True,
|
||
|
data_hdl=MaHandler(data_bank=DataBank()),
|
||
|
)
|
||
|
instance.start()
|
||
|
while True:
|
||
|
if (
|
||
|
not args.daemon
|
||
|
and time() - start_time > 60 * 15
|
||
|
and time() - instance.data_bank.last_forcedoff_write > 60 * 2
|
||
|
):
|
||
|
print("input timeout. stopping device")
|
||
|
break
|
||
|
if time() - instance.data_bank.last_forcedoff_write > 1:
|
||
|
instance.data_bank.set_holding_registers(FORCED_OFF, [0])
|
||
|
instance.data_bank.set_input_registers(FULL_STATUS, [1])
|
||
|
instance.data_bank.set_input_registers(SELF_STATUS, [1])
|
||
|
instance.data_bank.set_input_registers(TIME_OF_DAY, [3])
|
||
|
instance.data_bank.set_input_registers(PHOTOCELL, [3])
|
||
|
instance.data_bank.set_holding_registers(DIMMING, [0])
|
||
|
sleep(1)
|
||
|
|
||
|
except Exception as ex:
|
||
|
instance.stop()
|
||
|
exc = sys.exception()
|
||
|
traceback.print_tb(exc.__traceback__, limit=3, file=sys.stdout)
|
||
|
print(f"An epic failure just has happened! ({ex})")
|
||
|
else:
|
||
|
instance.stop()
|
||
|
print("bye")
|