#!/usr/bin/python3 -u # coding=utf-8 import logging import re import socket import sys import typing from gi.repository import GLib as glib import signals import config as cfg from dbus.mainloop.glib import DBusGMainLoop from pymodbus.client.sync import ModbusSerialClient as Modbus from pymodbus.exceptions import ModbusException, ModbusIOException from pymodbus.other_message import ReportSlaveIdRequest from pymodbus.pdu import ExceptionResponse from pymodbus.register_read_message import ReadInputRegistersResponse from data import BatteryStatus, BatterySignal, Battery, ServiceSignal from python_libs.ie_dbus.dbus_service import DBusService # trick the pycharm type-checker into thinking Callable is in scope, not used at runtime # noinspection PyUnreachableCode if False: from typing import Callable, List, Iterable, NoReturn RESET_REGISTER = 0x2087 SETTINGS_SERVICE_PREFIX = 'com.victronenergy.settings' INVERTER_SERVICE_PREFIX = 'com.victronenergy.vebus.' def init_modbus(tty): # type: (str) -> Modbus logging.debug('initializing Modbus') return Modbus( port='/dev/' + tty, method=cfg.MODE, baudrate=cfg.BAUD_RATE, stopbits=cfg.STOP_BITS, bytesize=cfg.BYTE_SIZE, timeout=cfg.TIMEOUT, parity=cfg.PARITY) def init_udp_socket(): # type: () -> socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setblocking(False) return s def report_slave_id(modbus, slave_address): # type: (Modbus, int) -> str slave = str(slave_address) logging.debug('requesting slave id from node ' + slave) with modbus: request = ReportSlaveIdRequest(unit=slave_address) response = modbus.execute(request) if response is ExceptionResponse or issubclass(type(response), ModbusException): raise Exception('failed to get slave id from ' + slave + ' : ' + str(response)) return response.identifier def identify_battery(modbus, slave_address): # type: (Modbus, int) -> Battery logging.info('identifying battery...') hardware_version, bms_version, ampere_hours = parse_slave_id(modbus, slave_address) firmware_version = read_firmware_version(modbus, slave_address) specs = Battery( slave_address=slave_address, hardware_version=hardware_version, firmware_version=firmware_version, bms_version=bms_version, ampere_hours=ampere_hours) logging.info('battery identified:\n{0}'.format(str(specs))) return specs def identify_batteries(modbus): # type: (Modbus) -> List[Battery] def _identify_batteries(): slave_address = 0 n_missing = -255 while n_missing < 3: slave_address += 1 try: yield identify_battery(modbus, slave_address) n_missing = 0 except Exception as e: logging.info('failed to identify battery at {0} : {1}'.format(str(slave_address), str(e))) n_missing += 1 logging.info('giving up searching for further batteries') batteries = list(_identify_batteries()) # dont be lazy! n = len(batteries) logging.info('found ' + str(n) + (' battery' if n == 1 else ' batteries')) return batteries def parse_slave_id(modbus, slave_address): # type: (Modbus, int) -> (str, str, int) slave_id = report_slave_id(modbus, slave_address) sid = re.sub(r'[^\x20-\x7E]', '', slave_id) # remove weird special chars match = re.match('(?P48TL(?P[0-9]+)) *(?P.*)', sid) if match is None: raise Exception('no known battery found') return match.group('hw').strip(), match.group('bms').strip(), int(match.group('ah').strip()) def read_firmware_version(modbus, slave_address): # type: (Modbus, int) -> str logging.debug('reading firmware version') with modbus: response = read_modbus_registers(modbus, slave_address, base_address=1054, count=1) register = response.registers[0] return '{0:0>4X}'.format(register) def read_modbus_registers(modbus, slave_address, base_address=cfg.BASE_ADDRESS, count=cfg.NO_OF_REGISTERS): # type: (Modbus, int, int, int) -> ReadInputRegistersResponse logging.debug('requesting modbus registers {0}-{1}'.format(base_address, base_address + count)) return modbus.read_input_registers( address=base_address, count=count, unit=slave_address) def read_battery_status(modbus, battery): # type: (Modbus, Battery) -> BatteryStatus """ Read the modbus registers containing the battery's status info. """ logging.debug('reading battery status') with modbus: data = read_modbus_registers(modbus, battery.slave_address) return BatteryStatus(battery, data.registers) def publish_values_on_dbus(service, battery_signals, battery_statuses): # type: (DBusService, Iterable[BatterySignal], Iterable[BatteryStatus]) -> () publish_individuals(service, battery_signals, battery_statuses) publish_aggregates(service, battery_signals, battery_statuses) def publish_aggregates(service, signals, battery_statuses): # type: (DBusService, Iterable[BatterySignal], Iterable[BatteryStatus]) -> () for s in signals: if s.aggregate is None: continue values = [s.get_value(battery_status) for battery_status in battery_statuses] value = s.aggregate(values) service.own_properties.set(s.dbus_path, value, s.unit) def publish_individuals(service, signals, battery_statuses): # type: (DBusService, Iterable[BatterySignal], Iterable[BatteryStatus]) -> () for signal in signals: for battery_status in battery_statuses: address = battery_status.battery.slave_address dbus_path = '/_Battery/' + str(address) + signal.dbus_path value = signal.get_value(battery_status) service.own_properties.set(dbus_path, value, signal.unit) def publish_service_signals(service, signals): # type: (DBusService, Iterable[ServiceSignal]) -> NoReturn for signal in signals: service.own_properties.set(signal.dbus_path, signal.value, signal.unit) def upload_status_to_innovenergy(sock, statuses): # type: (socket, Iterable[BatteryStatus]) -> bool logging.debug('upload status') try: for s in statuses: sock.sendto(s.serialize(), (cfg.INNOVENERGY_SERVER_IP, cfg.INNOVENERGY_SERVER_PORT)) except: logging.debug('FAILED') return False else: return True def print_usage(): print ('Usage: ' + __file__ + ' ') print ('Example: ' + __file__ + ' ttyUSB0') def parse_cmdline_args(argv): # type: (List[str]) -> str if len(argv) == 0: logging.info('missing command line argument for tty device') print_usage() sys.exit(1) return argv[0] def reset_batteries(modbus, batteries): # type: (Modbus, Iterable[Battery]) -> NoReturn logging.info('Resetting batteries...') for battery in batteries: result = modbus.write_registers(RESET_REGISTER, [1], unit=battery.slave_address) # expecting a ModbusIOException (timeout) # BMS can no longer reply because it is already reset success = isinstance(result, ModbusIOException) outcome = 'successfully' if success else 'FAILED to' logging.info('Battery {0} {1} reset'.format(str(battery.slave_address), outcome)) logging.info('Shutting down fz-sonick driver') exit(0) alive = True # global alive flag, watchdog_task clears it, update_task sets it def create_update_task(modbus, service, batteries): # type: (Modbus, DBusService, Iterable[Battery]) -> Callable[[],bool] """ Creates an update task which runs the main update function and resets the alive flag """ _socket = init_udp_socket() _signals = signals.init_battery_signals() def update_task(): # type: () -> bool global alive logging.debug('starting update cycle') # Checking if we have excess power and if so charge batteries more target = service.remote_properties.get(get_service(SETTINGS_SERVICE_PREFIX) + '/Settings/CGwacs/AcPowerSetPoint').value or 0 actual = service.remote_properties.get(get_service(INVERTER_SERVICE_PREFIX) + '/Ac/Out/P').value or 0 if actual>target: service.own_properties.set('/Info/MaxChargeCurrent').value = min([battery.i_max for battery in batteries]) if service.own_properties.get('/ResetBatteries').value == 1: reset_batteries(modbus, batteries) statuses = [read_battery_status(modbus, battery) for battery in batteries] publish_values_on_dbus(service, _signals, statuses) upload_status_to_innovenergy(_socket, statuses) logging.debug('finished update cycle\n') alive = True return True return update_task def create_watchdog_task(main_loop): # type: (DBusGMainLoop) -> Callable[[],bool] """ Creates a Watchdog task that monitors the alive flag. The watchdog kills the main loop if the alive flag is not periodically reset by the update task. Who watches the watchdog? """ def watchdog_task(): # type: () -> bool global alive if alive: logging.debug('watchdog_task: update_task is alive') alive = False return True else: logging.info('watchdog_task: killing main loop because update_task is no longer alive') main_loop.quit() return False return watchdog_task def get_service(self, prefix: str) -> Optional[unicode]: service = next((s for s in self.available_services if s.startswith(prefix)), None) if service is None: raise Exception('no service matching ' + prefix + '* available') return service def main(argv): # type: (List[str]) -> () logging.basicConfig(level=cfg.LOG_LEVEL) logging.info('starting ' + __file__) tty = parse_cmdline_args(argv) modbus = init_modbus(tty) batteries = identify_batteries(modbus) if len(batteries) <= 0: sys.exit(2) service = DBusService(service_name=cfg.SERVICE_NAME_PREFIX + tty) service.own_properties.set('/ResetBatteries', value=False, writable=True) # initial value = False main_loop = GLib.MainLoop() service_signals = signals.init_service_signals(batteries) publish_service_signals(service, service_signals) update_task = create_update_task(modbus, service, batteries) update_task() # run it right away, so that all props are initialized before anyone can ask watchdog_task = create_watchdog_task(main_loop) GLib.timeout_add(cfg.UPDATE_INTERVAL * 2, watchdog_task, priority = GLib.PRIORITY_LOW) # add watchdog first GLib.timeout_add(cfg.UPDATE_INTERVAL, update_task, priority = GLib.PRIORITY_LOW) # call update once every update_interval logging.info('starting gobject.MainLoop') main_loop.run() logging.info('gobject.MainLoop was shut down') sys.exit(0xFF) # reaches this only on error main(sys.argv[1:])