abibas/src/data/__init__.py

182 lines
5.7 KiB
Python

"""Common functions for manipulating with data source
Data source is the simple json files at the moment.
The main files with configuration are stored in the package itself,
files with overrides are stored in the tmp directory
"""
__controllers__ = 'controllers.json'
__models__ = 'controller-models.json'
import json
import os
import tempfile
import logging
import pkg_resources
from importlib.resources import files
def _get_overrides_fname(ct: str, port: int) -> str:
fname = os.path.join(tempfile.gettempdir(), f'{ct}@{port}.json')
return fname
def _get_controlers_fname() -> str:
fname = files(__name__).joinpath(__controllers__)
return fname
def _get_controller_models_fname() -> str:
fname = files(__name__).joinpath(__models__)
return fname
def _get_reg_value(controller: dict, reg: dict) -> int:
# get first value from type definition (as default)
value = None
if 'PossibleValues' not in reg or not reg['PossibleValues']:
value = 0
else:
k, v = next(iter(reg['PossibleValues'][0].items()), None)
value = v
# if there is a file with overriden value, take that
overrides = _get_overrides_fname(controller['Type'], controller['Port'])
if os.path.isfile(overrides):
with open(overrides, 'r') as file:
try:
overriden = json.load(file)
except json.JSONDecodeError:
logging.warning(
f'Failed to parse {overrides}, returning {value} for {reg["Name"]}'
)
return value
entry = [x for x in overriden if x['Name'] == reg['Name']]
if entry:
value = entry[0]['Value']
logging.debug(
f'{reg["Name"]} value gotten: {value} from {overrides}')
return value
def _save_overrides(fname: str, overrides: list):
with open(fname, 'w') as file:
json.dump(overrides, file, indent=2)
def get_controller(port: int, model: str) -> dict:
controllers = get_controllers()
models = get_controller_types()
controller = next(
filter(lambda x: x['Port'] == port and x['Type'] == model,
controllers), None)
if not controller:
return None
model = next(filter(lambda x: x['Name'] == controller['Type'], models),
None)
controller['Model'] = model
return controller
def get_controller_registers(port: int, model_name: str) -> list:
result = []
controller = get_controller(port, model_name)
for reg in controller['Model']['Registers']:
reg['CurrentValue'] = _get_reg_value(controller, reg)
result.append(reg)
result = sorted(result, key=lambda x: x['Address'])
return result
def set_controller_registers(ct: str, port: int, overriden: list):
fname = _get_overrides_fname(ct, port)
_save_overrides(fname, overriden)
def set_controller_register(ct: str, port: int, reg: str, val):
overrides = _get_overrides_fname(ct, port)
overriden = []
if os.path.isfile(overrides):
with open(overrides, 'r') as file:
overriden = json.load(file)
entry = next(filter(lambda x: x['Name'] == reg, overriden), None)
if not entry:
overriden.append({'Name': reg, 'Value': val})
_save_overrides(overrides, overriden)
elif entry['Value'] != val:
overriden.remove(entry)
overriden.append({'Name': reg, 'Value': val})
_save_overrides(overrides, overriden)
else:
pass
else:
overriden.append({'Name': reg, 'Value': val})
_save_overrides(overrides, overriden)
def get_controllers() -> dict:
"""Returns controllers along with their type represented as 'Model'."""
with open(_get_controlers_fname(), 'r') as file:
try:
controllers = json.load(file)
except (json.JSONDecodeError):
controllers = []
models = get_controller_types()
for c in controllers:
model = next(filter(lambda x: x['Name'] == c['Type'], models), None)
c['Model'] = model
return controllers
def set_controllers(data: list):
for c in data:
if 'Model' in c:
del c['Model']
with open(_get_controlers_fname(), 'w') as file:
json.dump(data, file, indent=2)
def get_controller_types() -> dict:
"""Returns controller types as they are in the json file."""
with open(_get_controller_models_fname(), 'r') as file:
try:
models = json.load(file)
for m in models:
registers = m.get('Registers', [])
for r in registers:
if 'Function' not in r:
r['Function'] = 'F16'
except (json.JSONDecodeError):
models = []
return models
def set_controller_types(data: list):
with open(_get_controller_models_fname(), 'w') as file:
json.dump(data, file, indent=2)
def remove_controller(port: int) -> bool:
controllers = get_controllers()
entry = next(filter(lambda x: x['Port'] == port, controllers), None)
if not entry:
logging.warning(f'Already removed at port {port}')
return False
controllers.remove(entry)
set_controllers(controllers)
overrides = _get_overrides_fname(entry['Type'], port)
if os.path.isfile(overrides):
logging.debug(f'Removing overrides file {overrides}')
os.remove(overrides)
else:
logging.debug(f'Overrides file {overrides} not found, skip removing')
return True
def get_info() -> dict:
distribution = pkg_resources.get_distribution('abibas')
return dict(version=distribution.version,
author='Taryel Hlontsi',
license='copyleft (GPLv3)')