#!/usr/bin/python3 -u # coding=utf-8 import re import sys import logging from gi.repository import GLib import config as cfg import convert as c from pymodbus.register_read_message import ReadInputRegistersResponse from pymodbus.client.sync import ModbusSerialClient as Modbus from pymodbus.other_message import ReportSlaveIdRequest from pymodbus.exceptions import ModbusException from pymodbus.pdu import ExceptionResponse from dbus.mainloop.glib import DBusGMainLoop from data import BatteryStatus, Signal, Battery, LedColor, CsvSignal, LedState from collections import Iterable from os import path app_dir = path.dirname(path.realpath(__file__)) sys.path.insert(1, path.join(app_dir, 'ext', 'velib_python')) from vedbus import VeDbusService as DBus import time import os import csv import requests import hmac import hashlib import base64 from datetime import datetime import io import json import requests import hmac import hashlib import base64 from datetime import datetime import pika import time # zip-comp additions import zipfile import io import shutil def compress_csv_data(csv_data, file_name="data.csv"): memory_stream = io.BytesIO() # Create a zip archive in the memory buffer with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: # Add CSV data to the ZIP archive with archive.open('data.csv', 'w') as entry_stream: entry_stream.write(csv_data.encode('utf-8')) # Get the compressed byte array from the memory buffer compressed_bytes = memory_stream.getvalue() # Encode the compressed byte array as a Base64 string base64_string = base64.b64encode(compressed_bytes).decode('utf-8') return base64_string class S3config: def __init__(self): self.bucket = cfg.S3BUCKET self.region = "sos-ch-dk-2" self.provider = "exo.io" self.key = cfg.S3KEY self.secret = cfg.S3SECRET self.content_type = "application/base64; charset=utf-8" @property def host(self): return f"{self.bucket}.{self.region}.{self.provider}" @property def url(self): return f"https://{self.host}" def create_put_request(self, s3_path, data): headers = self._create_request("PUT", s3_path) url = f"{self.url}/{s3_path}" response = requests.put(url, headers=headers, data=data) return response def _create_request(self, method, s3_path): date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') auth = self._create_authorization(method, self.bucket, s3_path, date, self.key, self.secret, self.content_type) headers = { "Host": self.host, "Date": date, "Authorization": auth, "Content-Type": self.content_type } return headers @staticmethod def _create_authorization(method, bucket, s3_path, date, s3_key, s3_secret, content_type="", md5_hash=""): payload = f"{method}\n{md5_hash}\n{content_type}\n{date}\n/{bucket.strip('/')}/{s3_path.strip('/')}" signature = base64.b64encode( hmac.new(s3_secret.encode(), payload.encode(), hashlib.sha1).digest() ).decode() return f"AWS {s3_key}:{signature}" def read_csv_as_string(file_path): """ Reads a CSV file from the given path and returns its content as a single string. """ try: with open(file_path, 'r', encoding='utf-8') as file: return file.read() except FileNotFoundError: print(f"Error: The file {file_path} does not exist.") return None except IOError as e: print(f"IO error occurred: {str(e)}") return None CSV_DIR = "/data/csv_files/" #CSV_DIR = "csv_files/" # Define the path to the file containing the installation name INSTALLATION_NAME_FILE = '/data/innovenergy/openvpn/installation-name' # trick the pycharm type-checker into thinking Callable is in scope, not used at runtime # noinspection PyUnreachableCode if False: from typing import Callable def interpret_limb_bitmap(bitmap_value): # The bit for string 1 also monitors all 5 strings: 0000 0000 means All 5 strings activated. 0000 0001 means string 1 disabled. string1_disabled = int((bitmap_value & 0b00001) != 0) string2_disabled = int((bitmap_value & 0b00010) != 0) string3_disabled = int((bitmap_value & 0b00100) != 0) string4_disabled = int((bitmap_value & 0b01000) != 0) string5_disabled = int((bitmap_value & 0b10000) != 0) n_limb_strings = string1_disabled+string2_disabled+string3_disabled+string4_disabled+string5_disabled return n_limb_strings def calc_power_limit_imposed_by_voltage_limit(v, i, v_limit, r_int): # type: (float, float, float, float) -> float dv = v_limit - v di = dv / r_int p_limit = v_limit * (i + di) return p_limit def calc_power_limit_imposed_by_current_limit(v, i, i_limit, r_int): # type: (float, float, float, float) -> float di = i_limit - i dv = di * r_int p_limit = i_limit * (v + dv) return p_limit def read_switch_closed(status): value = c.read_bool(register=1013, bit=0)(status) if value: return False return True def read_alarm_out_active(status): value = c.read_bool(register=1013, bit=1)(status) if value: return False return True def read_aux_relay(status): value = c.read_bool(register=1013, bit=4)(status) if value: return False return True def hex_string_to_ascii(hex_string): # Ensure the hex_string is correctly formatted without spaces hex_string = hex_string.replace(" ", "") # Convert every two characters (a byte) in the hex string to ASCII ascii_string = ''.join([chr(int(hex_string[i:i+2], 16)) for i in range(0, len(hex_string), 2)]) return ascii_string battery_status_reader = c.read_hex_string(1060,2) def read_eoc_reached(status): battery_status_string = battery_status_reader(status) return hex_string_to_ascii(battery_status_string) == "EOC_" def return_led_state(status, color): led_state = c.read_led_state(register=1004, led=color)(status) if led_state == LedState.blinking_fast or led_state == LedState.blinking_slow: return "Blinking" elif led_state == LedState.on: return "On" elif led_state == LedState.off: return "Off" return "Unknown" def return_led_state_blue(status): return return_led_state(status, LedColor.blue) def return_led_state_red(status): return return_led_state(status, LedColor.red) def return_led_state_green(status): return return_led_state(status, LedColor.green) def return_led_state_amber(status): return return_led_state(status, LedColor.amber) def read_serial_number(status): serial_regs = [1055, 1056, 1057, 1058] serial_parts = [] for reg in serial_regs: # reading each register as a single hex value hex_value_fun = c.read_hex_string(reg, 1) hex_value = hex_value_fun(status) # append without spaces and leading zeros stripped if any serial_parts.append(hex_value.replace(' ', '')) # concatenate all parts to form the full serial number serial_number = ''.join(serial_parts).rstrip('0') return serial_number def time_since_toc_in_time_format(status): time_in_minutes = c.read_float(register=1052)(status) # Convert minutes to total seconds total_seconds = int(time_in_minutes * 60) # Calculate days, hours, minutes, and seconds days = total_seconds // (24 * 3600) total_seconds = total_seconds % (24 * 3600) hours = total_seconds // 3600 total_seconds %= 3600 minutes = total_seconds // 60 seconds = total_seconds % 60 # Format the string to show days.hours:minutes:seconds return f"{days}.{hours:02}:{minutes:02}:{seconds:02}" def create_csv_signals(): read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2) read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2) read_limb_bitmap = c.read_bitmap(1059) def read_power(status): return int(read_current(status) * read_voltage(status)) def string1_disabled(status): bitmap_value = read_limb_bitmap(status) return int((bitmap_value & 0b00001) != 0) def string2_disabled(status): bitmap_value = read_limb_bitmap(status) return int((bitmap_value & 0b00010) != 0) def string3_disabled(status): bitmap_value = read_limb_bitmap(status) return int((bitmap_value & 0b00100) != 0) def string4_disabled(status): bitmap_value = read_limb_bitmap(status) return int((bitmap_value & 0b01000) != 0) def string5_disabled(status): bitmap_value = read_limb_bitmap(status) return int((bitmap_value & 0b10000) != 0) def limp_strings_value(status): return interpret_limb_bitmap(read_limb_bitmap(status)) def calc_max_charge_power(status): # type: (BatteryStatus) -> int n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status) i_max = n_strings * cfg.I_MAX_PER_STRING v_max = cfg.V_MAX r_int_min = cfg.R_STRING_MIN / n_strings r_int_max = cfg.R_STRING_MAX / n_strings v = read_voltage(status) i = read_current(status) p_limits = [ calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_min), calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_max), calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_min), calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_max), ] p_limit = min(p_limits) # p_limit is normally positive here (signed) p_limit = max(p_limit, 0) # charge power must not become negative return int(p_limit) def calc_max_discharge_power(status): n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status) max_discharge_current = n_strings*cfg.I_MAX_PER_STRING return int(max_discharge_current*read_voltage(status)) total_current = c.read_float(register=1062, scale_factor=0.01, offset=-10000, places=1) def read_total_current(status): return total_current(status) def read_heating_current(status): return total_current(status) - read_current(status) def read_heating_power(status): return read_voltage(status) * read_heating_current(status) soc_ah = c.read_float(register=1002, scale_factor=0.1, offset=-10000, places=1) def read_soc_ah(status): return soc_ah(status) return [ CsvSignal('/Battery/Devices/FwVersion', lambda bs: bs.battery.firmware_version), CsvSignal('/Battery/Devices/Dc/Power', read_power, 'W'), CsvSignal('/Battery/Devices/Dc/Voltage', read_voltage, 'V'), CsvSignal('/Battery/Devices/Soc', c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), '%'), CsvSignal('/Battery/Devices/Temperatures/Cells/Average', c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), 'C'), CsvSignal('/Battery/Devices/Dc/Current', read_current, 'A'), CsvSignal('/Battery/Devices/BusCurrent', read_total_current, 'A'), CsvSignal('/Battery/Devices/CellsCurrent', read_current, 'A'), CsvSignal('/Battery/Devices/HeatingCurrent', read_heating_current, 'A'), CsvSignal('/Battery/Devices/HeatingPower', read_heating_power, 'W'), CsvSignal('/Battery/Devices/SOCAh', read_soc_ah), CsvSignal('/Battery/Devices/Leds/Blue', return_led_state_blue), CsvSignal('/Battery/Devices/Leds/Red', return_led_state_red), CsvSignal('/Battery/Devices/Leds/Green', return_led_state_green), CsvSignal('/Battery/Devices/Leds/Amber', return_led_state_amber), CsvSignal('/Battery/Devices/BatteryStrings/String1Active', string1_disabled), CsvSignal('/Battery/Devices/BatteryStrings/String2Active', string2_disabled), CsvSignal('/Battery/Devices/BatteryStrings/String3Active', string3_disabled), CsvSignal('/Battery/Devices/BatteryStrings/String4Active', string4_disabled), CsvSignal('/Battery/Devices/BatteryStrings/String5Active', string5_disabled), CsvSignal('/Battery/Devices/IoStatus/ConnectedToDcBus', read_switch_closed), CsvSignal('/Battery/Devices/IoStatus/AlarmOutActive', read_alarm_out_active), CsvSignal('/Battery/Devices/IoStatus/InternalFanActive', c.read_bool(register=1013, bit=2)), CsvSignal('/Battery/Devices/IoStatus/VoltMeasurementAllowed', c.read_bool(register=1013, bit=3)), CsvSignal('/Battery/Devices/IoStatus/AuxRelayBus', read_aux_relay), CsvSignal('/Battery/Devices/IoStatus/RemoteStateActive', c.read_bool(register=1013, bit=5)), CsvSignal('/Battery/Devices/IoStatus/RiscActive', c.read_bool(register=1013, bit=6)), CsvSignal('/Battery/Devices/Eoc', read_eoc_reached), CsvSignal('/Battery/Devices/SerialNumber', read_serial_number), CsvSignal('/Battery/Devices/TimeSinceTOC', time_since_toc_in_time_format), CsvSignal('/Battery/Devices/MaxChargePower', calc_max_charge_power), CsvSignal('/Battery/Devices/MaxDischargePower', calc_max_discharge_power), ] def init_signals(n_batteries): # type: (int) -> Iterable[Signal] """ A Signal holds all information necessary for the handling of a certain datum (e.g. voltage) published by the battery. Signal(dbus_path, aggregate, get_value, get_text = str) dbus_path: str object_path on DBus where the datum needs to be published aggregate: Iterable[object] -> object function that combines the values of multiple batteries into one. e.g. sum for currents, or mean for voltages get_value: (BatteryStatus) -> object [optional] function to extract the datum from the modbus record, alternatively: a constant get_text: (object) -> unicode [optional] function to render datum to text, needed by DBus alternatively: a constant The conversion functions use the same parameters (e.g scale_factor, offset) as described in the document 'T48TLxxx ModBus Protocol Rev.7.1' which can be found in the /doc folder """ product_id = cfg.PRODUCT_ID product_id_hex = '0x{0:04product_id}' read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2) read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2) read_limb_bitmap = c.read_bitmap(1059) def read_battery_cold(status): return \ c.read_led_state(register=1004, led=LedColor.green)(status) >= LedState.blinking_slow and \ c.read_led_state(register=1004, led=LedColor.blue)(status) >= LedState.blinking_slow def read_power(status): return int(read_current(status) * read_voltage(status)) def limp_strings_value(status): return interpret_limb_bitmap(read_limb_bitmap(status)) def max_discharge_current(status): return (cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status))*cfg.I_MAX_PER_STRING def max_charge_current(status): return status.battery.ampere_hours/2 def calc_max_charge_power(status): # type: (BatteryStatus) -> int n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status) i_max = n_strings * cfg.I_MAX_PER_STRING v_max = cfg.V_MAX r_int_min = cfg.R_STRING_MIN / n_strings r_int_max = cfg.R_STRING_MAX / n_strings v = read_voltage(status) i = read_current(status) p_limits = [ calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_min), calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_max), calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_min), calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_max), ] p_limit = min(p_limits) # p_limit is normally positive here (signed) p_limit = max(p_limit, 0) # charge power must not become negative return int(p_limit) product_name = cfg.PRODUCT_NAME if n_batteries > 1: product_name = cfg.PRODUCT_NAME + ' x' + str(n_batteries) return [ # Node Red related dbus paths Signal('/TimeToTOCRequest', max, c.read_float(register=1052)), Signal('/EOCReached', c.return_in_list, read_eoc_reached), Signal('/BatteryCold',c.return_in_list, read_battery_cold), Signal('/NumOfLimbStrings', c.return_in_list, get_value=limp_strings_value), Signal('/NumOfBatteries', max, get_value=n_batteries), Signal('/Dc/0/Voltage', c.mean, get_value=read_voltage, get_text=c.append_unit('V')), Signal('/Dc/0/Current', c.ssum, get_value=read_current, get_text=c.append_unit('A')), Signal('/Dc/0/Power', c.ssum, get_value=read_power, get_text=c.append_unit('W')), Signal('/BussVoltage', c.mean, c.read_float(register=1001, scale_factor=0.01, offset=0, places=2), c.append_unit('V')), Signal('/Soc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')), Signal('/LowestSoc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')), Signal('/Dc/0/Temperature', c.mean, c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), c.append_unit(u'°C')), # Charge/Discharge current, voltage and power Signal('/Info/MaxDischargeCurrent', c.ssum, max_discharge_current,c.append_unit('A')), Signal('/Info/MaxChargeCurrent', c.ssum, max_charge_current, c.append_unit('A')), Signal('/Info/MaxChargeVoltage', min, cfg.MAX_CHARGE_VOLTAGE, c.append_unit('V')), Signal('/Info/MaxChargePower', c.ssum, calc_max_charge_power), # Victron mandatory dbus paths Signal('/Mgmt/ProcessName', c.first, __file__), Signal('/Mgmt/ProcessVersion', c.first, cfg.SOFTWARE_VERSION), Signal('/Mgmt/Connection', c.first, cfg.CONNECTION), Signal('/DeviceInstance', c.first, cfg.DEVICE_INSTANCE), Signal('/ProductName', c.first, product_name), Signal('/ProductId', c.first, cfg.PRODUCT_ID, product_id_hex), Signal('/Connected', c.first, 1), Signal('/FirmwareVersion', c.return_in_list, lambda bs: bs.battery.firmware_version), Signal('/HardwareVersion', c.return_in_list, lambda bs: bs.battery.hardware_version), Signal('/BmsVersion', c.return_in_list, lambda s: s.battery.bms_version), # Warnings Signal('/WarningFlags/TaM1', c.return_in_list, c.read_bool(register=1005, bit=1)), Signal('/WarningFlags/TbM1', c.return_in_list, c.read_bool(register=1005, bit=4)), Signal('/WarningFlags/VBm1', c.return_in_list, c.read_bool(register=1005, bit=6)), Signal('/WarningFlags/VBM1', c.return_in_list, c.read_bool(register=1005, bit=8)), Signal('/WarningFlags/IDM1', c.return_in_list, c.read_bool(register=1005, bit=10)), Signal('/WarningFlags/vsm1', c.return_in_list, c.read_bool(register=1005, bit=22)), Signal('/WarningFlags/vsM1', c.return_in_list, c.read_bool(register=1005, bit=24)), Signal('/WarningFlags/iCM1', c.return_in_list, c.read_bool(register=1005, bit=26)), Signal('/WarningFlags/iDM1', c.return_in_list, c.read_bool(register=1005, bit=28)), Signal('/WarningFlags/MID1', c.return_in_list, c.read_bool(register=1005, bit=30)), Signal('/WarningFlags/BLPW', c.return_in_list, c.read_bool(register=1005, bit=32)), Signal('/WarningFlags/CCBF', c.return_in_list, c.read_bool(register=1005, bit=33)), Signal('/WarningFlags/Ah_W', c.return_in_list, c.read_bool(register=1005, bit=35)), Signal('/WarningFlags/MPMM', c.return_in_list, c.read_bool(register=1005, bit=38)), Signal('/WarningFlags/TCdi', c.return_in_list, c.read_bool(register=1005, bit=40)), Signal('/WarningFlags/LMPW', c.return_in_list, c.read_bool(register=1005, bit=44)), Signal('/WarningFlags/TOCW', c.return_in_list, c.read_bool(register=1005, bit=47)), Signal('/WarningFlags/BUSL', c.return_in_list, c.read_bool(register=1005, bit=49)), # Alarms Signal('/AlarmFlags/Tam', c.return_in_list, c.read_bool(register=1009, bit=0)), Signal('/AlarmFlags/TaM2', c.return_in_list, c.read_bool(register=1009, bit=2)), Signal('/AlarmFlags/Tbm', c.return_in_list, c.read_bool(register=1009, bit=3)), Signal('/AlarmFlags/TbM2', c.return_in_list, c.read_bool(register=1009, bit=5)), Signal('/AlarmFlags/VBm2', c.return_in_list, c.read_bool(register=1009, bit=7)), Signal('/AlarmFlags/VBM2', c.return_in_list, c.read_bool(register=1009, bit=9)), Signal('/AlarmFlags/IDM2', c.return_in_list, c.read_bool(register=1009, bit=11)), Signal('/AlarmFlags/ISOB', c.return_in_list, c.read_bool(register=1009, bit=12)), Signal('/AlarmFlags/MSWE', c.return_in_list, c.read_bool(register=1009, bit=13)), Signal('/AlarmFlags/FUSE', c.return_in_list, c.read_bool(register=1009, bit=14)), Signal('/AlarmFlags/HTRE', c.return_in_list, c.read_bool(register=1009, bit=15)), Signal('/AlarmFlags/TCPE', c.return_in_list, c.read_bool(register=1009, bit=16)), Signal('/AlarmFlags/STRE', c.return_in_list, c.read_bool(register=1009, bit=17)), Signal('/AlarmFlags/CME', c.return_in_list, c.read_bool(register=1009, bit=18)), Signal('/AlarmFlags/HWFL', c.return_in_list, c.read_bool(register=1009, bit=19)), Signal('/AlarmFlags/HWEM', c.return_in_list, c.read_bool(register=1009, bit=20)), Signal('/AlarmFlags/ThM', c.return_in_list, c.read_bool(register=1009, bit=21)), Signal('/AlarmFlags/vsm2', c.return_in_list, c.read_bool(register=1009, bit=23)), Signal('/AlarmFlags/vsM2', c.return_in_list, c.read_bool(register=1009, bit=25)), Signal('/AlarmFlags/iCM2', c.return_in_list, c.read_bool(register=1009, bit=27)), Signal('/AlarmFlags/iDM2', c.return_in_list, c.read_bool(register=1009, bit=29)), Signal('/AlarmFlags/MID2', c.return_in_list, c.read_bool(register=1009, bit=31)), Signal('/AlarmFlags/HTFS', c.return_in_list, c.read_bool(register=1009, bit=42)), Signal('/AlarmFlags/DATA', c.return_in_list, c.read_bool(register=1009, bit=43)), Signal('/AlarmFlags/LMPA', c.return_in_list, c.read_bool(register=1009, bit=45)), Signal('/AlarmFlags/HEBT', c.return_in_list, c.read_bool(register=1009, bit=46)), Signal('/AlarmFlags/CURM', c.return_in_list, c.read_bool(register=1009, bit=48)), # LedStatus Signal('/LedStatus/Red', c.first, c.read_led_state(register=1004, led=LedColor.red)), Signal('/LedStatus/Blue', c.first, c.read_led_state(register=1004, led=LedColor.blue)), Signal('/LedStatus/Green', c.first, c.read_led_state(register=1004, led=LedColor.green)), Signal('/LedStatus/Amber', c.first, c.read_led_state(register=1004, led=LedColor.amber)), # IO Status Signal('/IoStatus/MainSwitchClosed', c.return_in_list, read_switch_closed), Signal('/IoStatus/AlarmOutActive', c.return_in_list, read_alarm_out_active), Signal('/IoStatus/InternalFanActive', c.return_in_list, c.read_bool(register=1013, bit=2)), Signal('/IoStatus/VoltMeasurementAllowed', c.return_in_list, c.read_bool(register=1013, bit=3)), Signal('/IoStatus/AuxRelay', c.return_in_list, read_aux_relay), Signal('/IoStatus/RemoteState', c.return_in_list, c.read_bool(register=1013, bit=5)), Signal('/IoStatus/RiscOn', c.return_in_list, c.read_bool(register=1013, bit=6)), ] def init_modbus(tty): # type: (str) -> Modbus logging.debug('initializing Modbus') return Modbus( port='/dev/' + tty, method=cfg.MODE, baudrate=cfg.BAUD_RATE, stopbits=cfg.STOP_BITS, bytesize=cfg.BYTE_SIZE, timeout=cfg.TIMEOUT, parity=cfg.PARITY) def init_dbus(tty, signals): # type: (str, Iterable[Signal]) -> DBus logging.debug('initializing DBus service') dbus = DBus(servicename=cfg.SERVICE_NAME_PREFIX + tty) logging.debug('initializing DBus paths') for signal in signals: init_dbus_path(dbus, signal) return dbus # noinspection PyBroadException def try_get_value(sig): # type: (Signal) -> object try: return sig.get_value(None) except: return None def init_dbus_path(dbus, sig): # type: (DBus, Signal) -> () dbus.add_path( sig.dbus_path, try_get_value(sig), gettextcallback=lambda _, v: sig.get_text(v)) def init_main_loop(): # type: () -> DBusGMainLoop logging.debug('initializing DBusGMainLoop Loop') DBusGMainLoop(set_as_default=True) return GLib.MainLoop() def report_slave_id(modbus, slave_address): # type: (Modbus, int) -> str slave = str(slave_address) logging.debug('requesting slave id from node ' + slave) try: modbus.connect() request = ReportSlaveIdRequest(unit=slave_address) response = modbus.execute(request) if response is ExceptionResponse or issubclass(type(response), ModbusException): raise Exception('failed to get slave id from ' + slave + ' : ' + str(response)) return response.identifier finally: modbus.close() def identify_battery(modbus, slave_address): # type: (Modbus, int) -> Battery logging.info('identifying battery...') hardware_version, bms_version, ampere_hours = parse_slave_id(modbus, slave_address) firmware_version = read_firmware_version(modbus, slave_address) specs = Battery( slave_address=slave_address, hardware_version=hardware_version, firmware_version=firmware_version, bms_version=bms_version, ampere_hours=ampere_hours) logging.info('battery identified:\n{0}'.format(str(specs))) return specs def identify_batteries(modbus): # type: (Modbus) -> list[Battery] def _identify_batteries(): address_range = range(1, cfg.MAX_SLAVE_ADDRESS + 1) for slave_address in address_range: try: yield identify_battery(modbus, slave_address) except Exception as e: logging.info('failed to identify battery at {0} : {1}'.format(str(slave_address), str(e))) return list(_identify_batteries()) # force that lazy iterable! def parse_slave_id(modbus, slave_address): # type: (Modbus, int) -> (str, str, int) slave_id = report_slave_id(modbus, slave_address) sid = re.sub(b'[^\x20-\x7E]', b'', slave_id) # remove weird special chars match = re.match('(?P48TL(?P\d+)) *(?P.*)', sid.decode('ascii')) if match is None: raise Exception('no known battery found') return match.group('hw'), match.group('bms'), int(match.group('ah')) def read_firmware_version(modbus, slave_address): # type: (Modbus, int) -> str logging.debug('reading firmware version') try: modbus.connect() response = read_modbus_registers(modbus, slave_address, base_address=1054, count=1) register = response.registers[0] return '{0:0>4X}'.format(register) finally: modbus.close() # close in any case def read_modbus_registers(modbus, slave_address, base_address=cfg.BASE_ADDRESS, count=cfg.NO_OF_REGISTERS): # type: (Modbus, int) -> ReadInputRegistersResponse logging.debug('requesting modbus registers {0}-{1}'.format(base_address, base_address + count)) return modbus.read_input_registers( address=base_address, count=count, unit=slave_address) def read_battery_status(modbus, battery): # type: (Modbus, Battery) -> BatteryStatus """ Read the modbus registers containing the battery's status info. """ logging.debug('reading battery status') try: modbus.connect() data = read_modbus_registers(modbus, battery.slave_address) return BatteryStatus(battery, data.registers) finally: modbus.close() # close in any case def publish_values(dbus, signals, statuses): # type: (DBus, Iterable[Signal], Iterable[BatteryStatus]) -> () for s in signals: values = [s.get_value(status) for status in statuses] with dbus as srv: srv[s.dbus_path] = s.aggregate(values) previous_warnings = {} previous_alarms = {} class MessageType: ALARM_OR_WARNING = "AlarmOrWarning" HEARTBEAT = "Heartbeat" class AlarmOrWarning: def __init__(self, description, created_by): self.date = datetime.now().strftime('%Y-%m-%d') self.time = datetime.now().strftime('%H:%M:%S') self.description = description self.created_by = created_by def to_dict(self): return { "Date": self.date, "Time": self.time, "Description": self.description, "CreatedBy": self.created_by } def SubscribeToQueue(): try: connection = pika.BlockingConnection(pika.ConnectionParameters(host="10.2.0.11", port=5672, virtual_host="/", credentials=pika.PlainCredentials("producer", "b187ceaddb54d5485063ddc1d41af66f"))) channel = connection.channel() channel.queue_declare(queue="statusQueue", durable=True) print("Subscribed to queue") except Exception as ex: print("An error occurred while connecting to the RabbitMQ queue:", ex) return channel is_first_update = True prev_status=0 channel = SubscribeToQueue() # Create an S3config instance s3_config = S3config() INSTALLATION_ID=int(s3_config.bucket.split('-')[0]) PRODUCT_ID = 1 def update_state_from_dictionaries(current_warnings, current_alarms, node_numbers): global previous_warnings, previous_alarms, INSTALLATION_ID, PRODUCT_ID, is_first_update, channel,prev_status if is_first_update: changed_warnings = current_warnings changed_alarms = current_alarms is_first_update = False else: changed_alarms={} changed_warnings={} # calculate the diff in warnings and alarms prev_alarm_value_list=list(previous_alarms.values()) alarm_keys=list(previous_alarms.keys()) for i, alarm in enumerate(current_alarms.values()): if alarm!=prev_alarm_value_list[i]: changed_alarms[alarm_keys[i]]=True else: changed_alarms[alarm_keys[i]]=False prev_warning_value_list=list(previous_warnings.values()) warning_keys=list(previous_warnings.keys()) for i, warning in enumerate(current_warnings.values()): if warning!=prev_warning_value_list[i]: changed_warnings[warning_keys[i]]=True else: changed_warnings[warning_keys[i]]=False status_message = { "InstallationId": INSTALLATION_ID, "Product": PRODUCT_ID, "Status": 0, "Type": 1, "Warnings": [], "Alarms": [] } alarms_number_list = [] for node_number in node_numbers: cnt = 0 for i, alarm_value in enumerate(current_alarms.values()): if int(list(current_alarms.keys())[i].split("/")[3]) == int(node_number): if alarm_value: cnt+=1 alarms_number_list.append(cnt) warnings_number_list = [] for node_number in node_numbers: cnt = 0 for i, warning_value in enumerate(current_warnings.values()): if int(list(current_warnings.keys())[i].split("/")[3]) == int(node_number): if warning_value: cnt+=1 warnings_number_list.append(cnt) # Evaluate alarms if any(changed_alarms.values()): for i, changed_alarm in enumerate(changed_alarms.values()): if changed_alarm and list(current_alarms.values())[i]: description = list(current_alarms.keys())[i].split("/")[-1] device_created = "Battery node " + list(current_alarms.keys())[i].split("/")[3] status_message["Alarms"].append(AlarmOrWarning(description, device_created).to_dict()) if any(changed_warnings.values()): for i, changed_warning in enumerate(changed_warnings.values()): if changed_warning and list(current_warnings.values())[i]: description = list(current_warnings.keys())[i].split("/")[-1] device_created = "Battery node " + list(current_warnings.keys())[i].split("/")[3] status_message["Warnings"].append(AlarmOrWarning(description, device_created).to_dict()) if any(current_alarms.values()): status_message["Status"]=2 if not any(current_alarms.values()) and any(current_warnings.values()): status_message["Status"]=1 if not any(current_alarms.values()) and not any(current_warnings.values()): status_message["Status"]=0 if status_message["Status"]!=prev_status or len(status_message["Warnings"])>0 or len(status_message["Alarms"])>0: prev_status=status_message["Status"] status_message["Type"]=0 status_message = json.dumps(status_message) channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message) print(status_message) print("Message sent successfully") previous_warnings = current_warnings.copy() previous_alarms = current_alarms.copy() return status_message, alarms_number_list, warnings_number_list def read_warning_and_alarm_flags(): return [ # Warnings CsvSignal('/Battery/Devices/WarningFlags/TaM1', c.read_bool(register=1005, bit=1)), CsvSignal('/Battery/Devices/WarningFlags/TbM1', c.read_bool(register=1005, bit=4)), CsvSignal('/Battery/Devices/WarningFlags/VBm1', c.read_bool(register=1005, bit=6)), CsvSignal('/Battery/Devices/WarningFlags/VBM1', c.read_bool(register=1005, bit=8)), CsvSignal('/Battery/Devices/WarningFlags/IDM1', c.read_bool(register=1005, bit=10)), CsvSignal('/Battery/Devices/WarningFlags/vsm1', c.read_bool(register=1005, bit=22)), CsvSignal('/Battery/Devices/WarningFlags/vsM1', c.read_bool(register=1005, bit=24)), CsvSignal('/Battery/Devices/WarningFlags/iCM1', c.read_bool(register=1005, bit=26)), CsvSignal('/Battery/Devices/WarningFlags/iDM1', c.read_bool(register=1005, bit=28)), CsvSignal('/Battery/Devices/WarningFlags/MID1', c.read_bool(register=1005, bit=30)), CsvSignal('/Battery/Devices/WarningFlags/BLPW', c.read_bool(register=1005, bit=32)), CsvSignal('/Battery/Devices/WarningFlags/CCBF', c.read_bool(register=1005, bit=33)), CsvSignal('/Battery/Devices/WarningFlags/Ah_W', c.read_bool(register=1005, bit=35)), CsvSignal('/Battery/Devices/WarningFlags/MPMM', c.read_bool(register=1005, bit=38)), CsvSignal('/Battery/Devices/WarningFlags/TCdi', c.read_bool(register=1005, bit=40)), CsvSignal('/Battery/Devices/WarningFlags/LMPW', c.read_bool(register=1005, bit=44)), CsvSignal('/Battery/Devices/WarningFlags/TOCW', c.read_bool(register=1005, bit=47)), CsvSignal('/Battery/Devices/WarningFlags/BUSL', c.read_bool(register=1005, bit=49)), ], [ # Alarms CsvSignal('/Battery/Devices/AlarmFlags/Tam', c.read_bool(register=1009, bit=0)), CsvSignal('/Battery/Devices/AlarmFlags/TaM2', c.read_bool(register=1009, bit=2)), CsvSignal('/Battery/Devices/AlarmFlags/Tbm', c.read_bool(register=1009, bit=3)), CsvSignal('/Battery/Devices/AlarmFlags/TbM2', c.read_bool(register=1009, bit=5)), CsvSignal('/Battery/Devices/AlarmFlags/VBm2', c.read_bool(register=1009, bit=7)), CsvSignal('/Battery/Devices/AlarmFlags/VBM2', c.read_bool(register=1009, bit=9)), CsvSignal('/Battery/Devices/AlarmFlags/IDM2', c.read_bool(register=1009, bit=11)), CsvSignal('/Battery/Devices/AlarmFlags/ISOB', c.read_bool(register=1009, bit=12)), CsvSignal('/Battery/Devices/AlarmFlags/MSWE', c.read_bool(register=1009, bit=13)), CsvSignal('/Battery/Devices/AlarmFlags/FUSE', c.read_bool(register=1009, bit=14)), CsvSignal('/Battery/Devices/AlarmFlags/HTRE', c.read_bool(register=1009, bit=15)), CsvSignal('/Battery/Devices/AlarmFlags/TCPE', c.read_bool(register=1009, bit=16)), CsvSignal('/Battery/Devices/AlarmFlags/STRE', c.read_bool(register=1009, bit=17)), CsvSignal('/Battery/Devices/AlarmFlags/CME', c.read_bool(register=1009, bit=18)), CsvSignal('/Battery/Devices/AlarmFlags/HWFL', c.read_bool(register=1009, bit=19)), CsvSignal('/Battery/Devices/AlarmFlags/HWEM', c.read_bool(register=1009, bit=20)), CsvSignal('/Battery/Devices/AlarmFlags/ThM', c.read_bool(register=1009, bit=21)), CsvSignal('/Battery/Devices/AlarmFlags/vsm2', c.read_bool(register=1009, bit=23)), CsvSignal('/Battery/Devices/AlarmFlags/vsM2', c.read_bool(register=1009, bit=25)), CsvSignal('/Battery/Devices/AlarmFlags/iCM2', c.read_bool(register=1009, bit=27)), CsvSignal('/Battery/Devices/AlarmFlags/iDM2', c.read_bool(register=1009, bit=29)), CsvSignal('/Battery/Devices/AlarmFlags/MID2', c.read_bool(register=1009, bit=31)), CsvSignal('/Battery/Devices/AlarmFlags/HTFS', c.read_bool(register=1009, bit=42)), CsvSignal('/Battery/Devices/AlarmFlags/DATA', c.read_bool(register=1009, bit=43)), CsvSignal('/Battery/Devices/AlarmFlags/LMPA', c.read_bool(register=1009, bit=45)), CsvSignal('/Battery/Devices/AlarmFlags/HEBT', c.read_bool(register=1009, bit=46)), CsvSignal('/Battery/Devices/AlarmFlags/CURM', c.read_bool(register=1009, bit=48)), CsvSignal('/Battery/Devices/AlarmFlags/2 or more string are disabled',c.read_limb_string(1059)), ] def update(modbus, batteries, dbus, signals, csv_signals): # type: (Modbus, Iterable[Battery], DBus, Iterable[Signal]) -> bool """ Main update function 1. requests status record each battery via modbus, 2. parses the data using Signal.get_value 3. aggregates the data from all batteries into one datum using Signal.aggregate 4. publishes the data on the dbus """ logging.debug('starting update cycle') warnings_signals, alarm_signals = read_warning_and_alarm_flags() current_warnings = {} current_alarms= {} statuses = [read_battery_status(modbus, battery) for battery in batteries] node_numbers = [battery.slave_address for battery in batteries] # Iterate over each node and signal to create rows in the new format for i, node in enumerate(node_numbers): for s in warnings_signals: signal_name = insert_id(s.name, node) value = s.get_value(statuses[i]) current_warnings[signal_name] = value for s in alarm_signals: signal_name = insert_id(s.name, node) value = s.get_value(statuses[i]) current_alarms[signal_name] = value #print(update_state_from_dictionaries(current_warnings, current_alarms)) status_message, alarms_number_list, warnings_number_list = update_state_from_dictionaries(current_warnings, current_alarms, node_numbers) publish_values(dbus, signals, statuses) create_csv_files(csv_signals, statuses, node_numbers, alarms_number_list, warnings_number_list) logging.debug('finished update cycle\n') return True def print_usage(): print('Usage: ' + __file__ + ' ') print('Example: ' + __file__ + ' ttyUSB0') def parse_cmdline_args(argv): # type: (list[str]) -> str if len(argv) == 0: logging.info('missing command line argument for tty device') print_usage() sys.exit(1) return argv[0] def count_files_in_folder(folder_path): try: # List all files in the folder files = os.listdir(folder_path) # Filter out directories, only count files num_files = sum(1 for f in files if os.path.isfile(os.path.join(folder_path, f))) return num_files except FileNotFoundError: return "Folder not found" except Exception as e: return str(e) def create_batch_of_csv_files(): global prev_status # list all files in the directory files = os.listdir(CSV_DIR) # filter out only csv files csv_files = [file for file in files if file.endswith('.csv')] # sort csv files by creation time csv_files.sort(key=lambda x: os.path.getctime(os.path.join(CSV_DIR, x))) # keep the 30 MOST RECENT FILES recent_csv_files = csv_files[-30:] if len(csv_files) > 30 else csv_files # get the name of the first csv file if not csv_files: print("No csv files found in the directory.") exit(0) first_csv_file = os.path.join(CSV_DIR, recent_csv_files.pop(0)) first_csv_filename = os.path.basename(first_csv_file) temp_file_path = os.path.join(CSV_DIR, 'temp_batch_file.csv') # create a temporary file and write the timestamp and the original content of the first file with open(temp_file_path, 'wb') as temp_file: # Write the timestamp (filename) at the beginning numeric_part = first_csv_filename.split('.')[0] temp_file.write(f'Timestamp;{numeric_part}\n'.encode('utf-8')) # write the original content of the first csv file with open(first_csv_file, 'rb') as f: temp_file.write(f.read()) for csv_file in recent_csv_files: file_path = os.path.join(CSV_DIR, csv_file) # write an empty line temp_file.write(b'\n') # write the timestamp (filename) numeric_part = csv_file.split('.')[0] temp_file.write(f'Timestamp;{numeric_part}\n'.encode('utf-8')) # write the content of the file with open(file_path, 'rb') as f: temp_file.write(f.read()) # replace the original first csv file with the temporary file os.remove(first_csv_file) os.rename(temp_file_path, first_csv_file) # create a loggin directory that contains at max 20 batch files for logging info logging_dir = os.path.join(CSV_DIR, 'logging_batch_files') if not os.path.exists(logging_dir): os.makedirs(logging_dir) shutil.copy(first_csv_file, logging_dir) manage_csv_files(logging_dir) # keep at most 1900 files at CSV_DIR for logging and aggregation manage_csv_files(CSV_DIR, 1900) # prepare for compression csv_data = read_csv_as_string(first_csv_file) if csv_data is None: print("error while reading csv as string") return # zip-comp additions compressed_csv = compress_csv_data(csv_data) # Use the name of the last (most recent) CSV file in sorted csv_files as the name for the compressed file last_csv_file_name = os.path.basename(recent_csv_files[-1]) if recent_csv_files else first_csv_filename numeric_part = int(last_csv_file_name.split('.')[0]) compressed_filename = "{}.csv".format(numeric_part) response = s3_config.create_put_request(compressed_filename, compressed_csv) if response.status_code == 200: os.remove(first_csv_file) print("Successfully uploaded the compresseed batch of files in s3") status_message = { "InstallationId": INSTALLATION_ID, "Product": PRODUCT_ID, "Status": prev_status, "Type": 1, "Warnings": [], "Alarms": [], "Timestamp": numeric_part } status_message = json.dumps(status_message) channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message) print("Successfully sent the heartbit with timestamp") else: # we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging failed_dir = os.path.join(CSV_DIR, "failed") if not os.path.exists(failed_dir): os.makedirs(failed_dir) failed_path = os.path.join(failed_dir, first_csv_filename) os.rename(first_csv_file, failed_path) print("Uploading failed") manage_csv_files(failed_dir, 100) alive = True # global alive flag, watchdog_task clears it, update_task sets it ALLOW = False def create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop): # type: (Modbus, DBus, Iterable[Battery], Iterable[Signal], DBusGMainLoop) -> Callable[[],bool] """ Creates an update task which runs the main update function and resets the alive flag """ start_time = time.time() def update_task(): # type: () -> bool nonlocal start_time global alive, ALLOW if ALLOW: ALLOW = False else: ALLOW = True alive = update(modbus, batteries, dbus, signals, csv_signals) elapsed_time = time.time() - start_time if elapsed_time >= 60: create_batch_of_csv_files() start_time = time.time() #alive = update_for_testing(modbus, batteries, dbus, signals, csv_signals) if not alive: logging.info('update_task: quitting main loop because of error') main_loop.quit() return alive return update_task def create_watchdog_task(main_loop): # type: (DBusGMainLoop) -> Callable[[],bool] """ Creates a Watchdog task that monitors the alive flag. The watchdog kills the main loop if the alive flag is not periodically reset by the update task. Who watches the watchdog? """ def watchdog_task(): # type: () -> bool global alive if alive: logging.debug('watchdog_task: update_task is alive') alive = False return True else: logging.info('watchdog_task: killing main loop because update_task is no longer alive') main_loop.quit() return False return watchdog_task def get_installation_name(file_path): with open(file_path, 'r') as file: return file.read().strip() def manage_csv_files(directory_path, max_files=20): csv_files = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))] csv_files.sort(key=lambda x: os.path.getctime(os.path.join(directory_path, x))) # Remove oldest files if exceeds maximum while len(csv_files) > max_files: file_to_delete = os.path.join(directory_path, csv_files.pop(0)) os.remove(file_to_delete) def serialize_for_csv(value): if isinstance(value, (dict, list, tuple)): return json.dumps(value, ensure_ascii=False) return str(value) def insert_id(path, id_number): parts = path.split("/") insert_position = parts.index("Devices") + 1 parts.insert(insert_position, str(id_number)) return "/".join(parts) def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnings_number_list): global s3_config timestamp = int(time.time()) if timestamp % 2 != 0: timestamp -= 1 # Create CSV directory if it doesn't exist if not os.path.exists(CSV_DIR): os.makedirs(CSV_DIR) csv_filename = f"{timestamp}.csv" csv_path = os.path.join(CSV_DIR, csv_filename) # Append values to the CSV file with open(csv_path, 'a', newline='') as csvfile: csv_writer = csv.writer(csvfile, delimiter=';') # Add a special row for the nodes configuration nodes_config_path = "/Config/Devices/BatteryNodes" nodes_list = ",".join(str(node) for node in node_numbers) config_row = [nodes_config_path, nodes_list, ""] csv_writer.writerow(config_row) # Iterate over each node and signal to create rows in the new format for i, node in enumerate(node_numbers): csv_writer.writerow([f"/Battery/Devices/{str(i+1)}/Alarms", alarms_number_list[i], ""]) csv_writer.writerow([f"/Battery/Devices/{str(i+1)}/Warnings", warnings_number_list[i], ""]) for s in signals: signal_name = insert_id(s.name, i+1) value = s.get_value(statuses[i]) row_values = [signal_name, value, s.get_text] csv_writer.writerow(row_values) def main(argv): # type: (list[str]) -> () logging.basicConfig(level=cfg.LOG_LEVEL) logging.info('starting ' + __file__) tty = parse_cmdline_args(argv) modbus = init_modbus(tty) batteries = identify_batteries(modbus) n = len(batteries) logging.info('found ' + str(n) + (' battery' if n == 1 else ' batteries')) if n <= 0: sys.exit(2) bat = c.first(batteries) # report hw and fw version of first battery found signals = init_signals(bat.hardware_version, bat.firmware_version, n) csv_signals = create_csv_signals(bat.firmware_version) main_loop = init_main_loop() # must run before init_dbus because gobject does some global magic dbus = init_dbus(tty, signals) update_task = create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop) watchdog_task = create_watchdog_task(main_loop) GLib.timeout_add(cfg.UPDATE_INTERVAL * 2, watchdog_task) # add watchdog first GLib.timeout_add(cfg.UPDATE_INTERVAL, update_task) # call update once every update_interval logging.info('starting GLib.MainLoop') main_loop.run() logging.info('GLib.MainLoop was shut down') sys.exit(0xFF) # reaches this only on error if __name__ == "__main__": main(sys.argv[1:])