cip400/cip400.py

136 lines
4.8 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
import sys
import traceback
import argparse
from pyModbusTCP.server import ModbusServer, DataBank, DataHandler
from time import sleep, time
FORCED_OFF = 20210
RESET = 20001
FULL_STATUS = 19000
SELF_STATUS = 19005
TIME_OF_DAY = 20200
PHOTOCELL = 20202
DIMMING = 20203
class MaHandler(DataHandler):
def __init__(self, data_bank):
super().__init__(data_bank=data_bank)
data_bank.last_forcedoff_write = 0
def read_coils(self, address, count, srv_info):
ret = super().read_coils(address, count, srv_info)
return ret
def read_d_inputs(self, address, count, srv_info):
ret = super().read_d_inputs(address, count, srv_info)
return ret
def read_h_regs(self, address, count, srv_info):
ret = super().read_h_regs(address, count, srv_info)
return ret
def read_i_regs(self, address, count, srv_info):
ret = super().read_i_regs(address, count, srv_info)
return ret
def write_coils(self, address, bits_l, srv_info):
ret = super().write_coils(address, bits_l, srv_info)
if address == RESET and bits_l[0]:
instance.data_bank.set_holding_registers(FORCED_OFF, [0])
instance.data_bank.set_input_registers(FULL_STATUS, [4])
instance.data_bank.set_input_registers(SELF_STATUS, [4])
instance.data_bank.set_input_registers(TIME_OF_DAY, [255])
instance.data_bank.set_input_registers(PHOTOCELL, [255])
instance.data_bank.set_holding_registers(DIMMING, [0])
instance.data_bank.set_coils(RESET, [False])
else:
pass
return ret
def write_h_regs(self, address, words_l, srv_info):
ret = super().write_h_regs(address, words_l, srv_info)
if address == DIMMING:
print(f"write Dimming {address=} -> {words_l[0]}")
if words_l[0] in (0, 128):
self.data_bank.set_input_registers(DIMMING, [0])
elif words_l[0] == 129:
self.data_bank.set_input_registers(DIMMING, [1])
elif words_l[0] == 130:
self.data_bank.set_input_registers(DIMMING, [2])
else:
pass
elif address == TIME_OF_DAY:
print(f"write ToD {address=} -> {words_l[0]}")
if words_l[0] in (0, 128):
self.data_bank.set_input_registers(TIME_OF_DAY, [0])
elif words_l[0] == 129:
self.data_bank.set_input_registers(TIME_OF_DAY, [1])
elif words_l[0] == 130:
self.data_bank.set_input_registers(TIME_OF_DAY, [2])
elif words_l[0] == 131:
self.data_bank.set_input_registers(TIME_OF_DAY, [3])
elif words_l[0] == 132:
self.data_bank.set_input_registers(TIME_OF_DAY, [4])
else:
pass
elif address == FORCED_OFF:
print(f"write Forced off {address=} -> {words_l[0]}")
self.data_bank.last_forcedoff_write = time()
else:
pass
return ret
if __name__ == "__main__":
start_time = time()
parser = argparse.ArgumentParser()
parser.add_argument(
"-H", "--host", type=str, default="localhost", help="Host (default: localhost)"
)
parser.add_argument(
"-p", "--port", type=int, default=11800, help="TCP port (default: 11800)"
)
parser.add_argument(
"-d",
"--daemon",
action="store_true",
help="Run as daemon with no 15 min timeout",
)
args = parser.parse_args()
try:
instance = ModbusServer(
host=args.host,
port=args.port,
no_block=True,
data_hdl=MaHandler(data_bank=DataBank()),
)
instance.start()
while True:
if (
not args.daemon
and time() - start_time > 60 * 15
and time() - instance.data_bank.last_forcedoff_write > 60 * 2
):
print("input timeout. stopping device")
break
if time() - instance.data_bank.last_forcedoff_write > 1:
instance.data_bank.set_holding_registers(FORCED_OFF, [0])
instance.data_bank.set_input_registers(FULL_STATUS, [1])
instance.data_bank.set_input_registers(SELF_STATUS, [1])
instance.data_bank.set_input_registers(TIME_OF_DAY, [3])
instance.data_bank.set_input_registers(PHOTOCELL, [3])
instance.data_bank.set_holding_registers(DIMMING, [0])
sleep(1)
except Exception as ex:
instance.stop()
exc = sys.exception()
traceback.print_tb(exc.__traceback__, limit=3, file=sys.stdout)
print(f"An epic failure just has happened! ({ex})")
else:
instance.stop()
print("bye")