Innovenergy_trunk/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py

1189 lines
51 KiB
Python
Executable File

#!/usr/bin/python3 -u
# coding=utf-8
import re
import sys
import logging
from gi.repository import GLib
import config as cfg
import convert as c
from pymodbus.register_read_message import ReadInputRegistersResponse
from pymodbus.client.sync import ModbusSerialClient as Modbus
from pymodbus.other_message import ReportSlaveIdRequest
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ExceptionResponse
from dbus.mainloop.glib import DBusGMainLoop
from data import BatteryStatus, Signal, Battery, LedColor, CsvSignal, LedState
from collections import Iterable
from os import path
app_dir = path.dirname(path.realpath(__file__))
sys.path.insert(1, path.join(app_dir, 'ext', 'velib_python'))
from vedbus import VeDbusService as DBus
import time
import os
import csv
import requests
import hmac
import hashlib
import base64
from datetime import datetime
import io
import json
import requests
import hmac
import hashlib
import base64
from datetime import datetime
import pika
import time
# zip-comp additions
import zipfile
import io
import shutil
def compress_csv_data(csv_data, file_name="data.csv"):
memory_stream = io.BytesIO()
# Create a zip archive in the memory buffer
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
# Add CSV data to the ZIP archive
with archive.open('data.csv', 'w') as entry_stream:
entry_stream.write(csv_data.encode('utf-8'))
# Get the compressed byte array from the memory buffer
compressed_bytes = memory_stream.getvalue()
# Encode the compressed byte array as a Base64 string
base64_string = base64.b64encode(compressed_bytes).decode('utf-8')
return base64_string
class S3config:
def __init__(self):
self.bucket = cfg.S3BUCKET
self.region = "sos-ch-dk-2"
self.provider = "exo.io"
self.key = cfg.S3KEY
self.secret = cfg.S3SECRET
self.content_type = "application/base64; charset=utf-8"
@property
def host(self):
return f"{self.bucket}.{self.region}.{self.provider}"
@property
def url(self):
return f"https://{self.host}"
def create_put_request(self, s3_path, data):
headers = self._create_request("PUT", s3_path)
url = f"{self.url}/{s3_path}"
response = requests.put(url, headers=headers, data=data)
return response
def _create_request(self, method, s3_path):
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
auth = self._create_authorization(method, self.bucket, s3_path, date, self.key, self.secret, self.content_type)
headers = {
"Host": self.host,
"Date": date,
"Authorization": auth,
"Content-Type": self.content_type
}
return headers
@staticmethod
def _create_authorization(method, bucket, s3_path, date, s3_key, s3_secret, content_type="", md5_hash=""):
payload = f"{method}\n{md5_hash}\n{content_type}\n{date}\n/{bucket.strip('/')}/{s3_path.strip('/')}"
signature = base64.b64encode(
hmac.new(s3_secret.encode(), payload.encode(), hashlib.sha1).digest()
).decode()
return f"AWS {s3_key}:{signature}"
def read_csv_as_string(file_path):
"""
Reads a CSV file from the given path and returns its content as a single string.
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
print(f"Error: The file {file_path} does not exist.")
return None
except IOError as e:
print(f"IO error occurred: {str(e)}")
return None
CSV_DIR = "/data/csv_files/"
#CSV_DIR = "csv_files/"
# Define the path to the file containing the installation name
INSTALLATION_NAME_FILE = '/data/innovenergy/openvpn/installation-name'
# trick the pycharm type-checker into thinking Callable is in scope, not used at runtime
# noinspection PyUnreachableCode
if False:
from typing import Callable
def interpret_limb_bitmap(bitmap_value):
# The bit for string 1 also monitors all 5 strings: 0000 0000 means All 5 strings activated. 0000 0001 means string 1 disabled.
string1_disabled = int((bitmap_value & 0b00001) != 0)
string2_disabled = int((bitmap_value & 0b00010) != 0)
string3_disabled = int((bitmap_value & 0b00100) != 0)
string4_disabled = int((bitmap_value & 0b01000) != 0)
string5_disabled = int((bitmap_value & 0b10000) != 0)
n_limb_strings = string1_disabled+string2_disabled+string3_disabled+string4_disabled+string5_disabled
return n_limb_strings
def calc_power_limit_imposed_by_voltage_limit(v, i, v_limit, r_int):
# type: (float, float, float, float) -> float
dv = v_limit - v
di = dv / r_int
p_limit = v_limit * (i + di)
return p_limit
def calc_power_limit_imposed_by_current_limit(v, i, i_limit, r_int):
# type: (float, float, float, float) -> float
di = i_limit - i
dv = di * r_int
p_limit = i_limit * (v + dv)
return p_limit
def read_switch_closed(status):
value = c.read_bool(register=1013, bit=0)(status)
if value:
return False
return True
def read_alarm_out_active(status):
value = c.read_bool(register=1013, bit=1)(status)
if value:
return False
return True
def read_aux_relay(status):
value = c.read_bool(register=1013, bit=4)(status)
if value:
return False
return True
def hex_string_to_ascii(hex_string):
# Ensure the hex_string is correctly formatted without spaces
hex_string = hex_string.replace(" ", "")
# Convert every two characters (a byte) in the hex string to ASCII
ascii_string = ''.join([chr(int(hex_string[i:i+2], 16)) for i in range(0, len(hex_string), 2)])
return ascii_string
battery_status_reader = c.read_hex_string(1060,2)
def read_eoc_reached(status):
battery_status_string = battery_status_reader(status)
return hex_string_to_ascii(battery_status_string) == "EOC_"
def return_led_state(status, color):
led_state = c.read_led_state(register=1004, led=color)(status)
if led_state == LedState.blinking_fast or led_state == LedState.blinking_slow:
return "Blinking"
elif led_state == LedState.on:
return "On"
elif led_state == LedState.off:
return "Off"
return "Unknown"
def return_led_state_blue(status):
return return_led_state(status, LedColor.blue)
def return_led_state_red(status):
return return_led_state(status, LedColor.red)
def return_led_state_green(status):
return return_led_state(status, LedColor.green)
def return_led_state_amber(status):
return return_led_state(status, LedColor.amber)
def read_serial_number(status):
serial_regs = [1055, 1056, 1057, 1058]
serial_parts = []
for reg in serial_regs:
# reading each register as a single hex value
hex_value_fun = c.read_hex_string(reg, 1)
hex_value = hex_value_fun(status)
# append without spaces and leading zeros stripped if any
serial_parts.append(hex_value.replace(' ', ''))
# concatenate all parts to form the full serial number
serial_number = ''.join(serial_parts).rstrip('0')
return serial_number
def time_since_toc_in_time_format(status):
time_in_minutes = c.read_float(register=1052)(status)
# Convert minutes to total seconds
total_seconds = int(time_in_minutes * 60)
# Calculate days, hours, minutes, and seconds
days = total_seconds // (24 * 3600)
total_seconds = total_seconds % (24 * 3600)
hours = total_seconds // 3600
total_seconds %= 3600
minutes = total_seconds // 60
seconds = total_seconds % 60
# Format the string to show days.hours:minutes:seconds
return f"{days}.{hours:02}:{minutes:02}:{seconds:02}"
def create_csv_signals():
read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2)
read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2)
read_limb_bitmap = c.read_bitmap(1059)
def read_power(status):
return int(read_current(status) * read_voltage(status))
def string1_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00001) != 0)
def string2_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00010) != 0)
def string3_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00100) != 0)
def string4_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b01000) != 0)
def string5_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b10000) != 0)
def limp_strings_value(status):
return interpret_limb_bitmap(read_limb_bitmap(status))
def calc_max_charge_power(status):
# type: (BatteryStatus) -> int
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
i_max = n_strings * cfg.I_MAX_PER_STRING
v_max = cfg.V_MAX
r_int_min = cfg.R_STRING_MIN / n_strings
r_int_max = cfg.R_STRING_MAX / n_strings
v = read_voltage(status)
i = read_current(status)
p_limits = [
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_min),
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_max),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_min),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_max),
]
p_limit = min(p_limits) # p_limit is normally positive here (signed)
p_limit = max(p_limit, 0) # charge power must not become negative
return int(p_limit)
def calc_max_discharge_power(status):
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
max_discharge_current = n_strings*cfg.I_MAX_PER_STRING
return int(max_discharge_current*read_voltage(status))
total_current = c.read_float(register=1062, scale_factor=0.01, offset=-10000, places=1)
def read_total_current(status):
return total_current(status)
def read_heating_current(status):
return total_current(status) - read_current(status)
def read_heating_power(status):
return read_voltage(status) * read_heating_current(status)
soc_ah = c.read_float(register=1002, scale_factor=0.1, offset=-10000, places=1)
def read_soc_ah(status):
return soc_ah(status)
return [
CsvSignal('/Battery/Devices/FwVersion', lambda bs: bs.battery.firmware_version),
CsvSignal('/Battery/Devices/Dc/Power', read_power, 'W'),
CsvSignal('/Battery/Devices/Dc/Voltage', read_voltage, 'V'),
CsvSignal('/Battery/Devices/Soc', c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), '%'),
CsvSignal('/Battery/Devices/Temperatures/Cells/Average', c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), 'C'),
CsvSignal('/Battery/Devices/Dc/Current', read_current, 'A'),
CsvSignal('/Battery/Devices/BusCurrent', read_total_current, 'A'),
CsvSignal('/Battery/Devices/CellsCurrent', read_current, 'A'),
CsvSignal('/Battery/Devices/HeatingCurrent', read_heating_current, 'A'),
CsvSignal('/Battery/Devices/HeatingPower', read_heating_power, 'W'),
CsvSignal('/Battery/Devices/SOCAh', read_soc_ah),
CsvSignal('/Battery/Devices/Leds/Blue', return_led_state_blue),
CsvSignal('/Battery/Devices/Leds/Red', return_led_state_red),
CsvSignal('/Battery/Devices/Leds/Green', return_led_state_green),
CsvSignal('/Battery/Devices/Leds/Amber', return_led_state_amber),
CsvSignal('/Battery/Devices/BatteryStrings/String1Active', string1_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String2Active', string2_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String3Active', string3_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String4Active', string4_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String5Active', string5_disabled),
CsvSignal('/Battery/Devices/IoStatus/ConnectedToDcBus', read_switch_closed),
CsvSignal('/Battery/Devices/IoStatus/AlarmOutActive', read_alarm_out_active),
CsvSignal('/Battery/Devices/IoStatus/InternalFanActive', c.read_bool(register=1013, bit=2)),
CsvSignal('/Battery/Devices/IoStatus/VoltMeasurementAllowed', c.read_bool(register=1013, bit=3)),
CsvSignal('/Battery/Devices/IoStatus/AuxRelayBus', read_aux_relay),
CsvSignal('/Battery/Devices/IoStatus/RemoteStateActive', c.read_bool(register=1013, bit=5)),
CsvSignal('/Battery/Devices/IoStatus/RiscActive', c.read_bool(register=1013, bit=6)),
CsvSignal('/Battery/Devices/Eoc', read_eoc_reached),
CsvSignal('/Battery/Devices/SerialNumber', read_serial_number),
CsvSignal('/Battery/Devices/TimeSinceTOC', time_since_toc_in_time_format),
CsvSignal('/Battery/Devices/MaxChargePower', calc_max_charge_power),
CsvSignal('/Battery/Devices/MaxDischargePower', calc_max_discharge_power),
]
def init_signals(n_batteries):
# type: (int) -> Iterable[Signal]
"""
A Signal holds all information necessary for the handling of a
certain datum (e.g. voltage) published by the battery.
Signal(dbus_path, aggregate, get_value, get_text = str)
dbus_path: str
object_path on DBus where the datum needs to be published
aggregate: Iterable[object] -> object
function that combines the values of multiple batteries into one.
e.g. sum for currents, or mean for voltages
get_value: (BatteryStatus) -> object [optional]
function to extract the datum from the modbus record,
alternatively: a constant
get_text: (object) -> unicode [optional]
function to render datum to text, needed by DBus
alternatively: a constant
The conversion functions use the same parameters (e.g scale_factor, offset)
as described in the document 'T48TLxxx ModBus Protocol Rev.7.1' which can
be found in the /doc folder
"""
product_id = cfg.PRODUCT_ID
product_id_hex = '0x{0:04product_id}'
read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2)
read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2)
read_limb_bitmap = c.read_bitmap(1059)
def read_battery_cold(status):
return \
c.read_led_state(register=1004, led=LedColor.green)(status) >= LedState.blinking_slow and \
c.read_led_state(register=1004, led=LedColor.blue)(status) >= LedState.blinking_slow
def read_power(status):
return int(read_current(status) * read_voltage(status))
def limp_strings_value(status):
return interpret_limb_bitmap(read_limb_bitmap(status))
def max_discharge_current(status):
return (cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status))*cfg.I_MAX_PER_STRING
def max_charge_current(status):
return status.battery.ampere_hours/2
def calc_max_charge_power(status):
# type: (BatteryStatus) -> int
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
i_max = n_strings * cfg.I_MAX_PER_STRING
v_max = cfg.V_MAX
r_int_min = cfg.R_STRING_MIN / n_strings
r_int_max = cfg.R_STRING_MAX / n_strings
v = read_voltage(status)
i = read_current(status)
p_limits = [
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_min),
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_max),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_min),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_max),
]
p_limit = min(p_limits) # p_limit is normally positive here (signed)
p_limit = max(p_limit, 0) # charge power must not become negative
return int(p_limit)
product_name = cfg.PRODUCT_NAME
if n_batteries > 1:
product_name = cfg.PRODUCT_NAME + ' x' + str(n_batteries)
return [
# Node Red related dbus paths
Signal('/TimeToTOCRequest', max, c.read_float(register=1052)),
Signal('/EOCReached', c.return_in_list, read_eoc_reached),
Signal('/BatteryCold',c.return_in_list, read_battery_cold),
Signal('/NumOfLimbStrings', c.return_in_list, get_value=limp_strings_value),
Signal('/NumOfBatteries', max, get_value=n_batteries),
Signal('/Dc/0/Voltage', c.mean, get_value=read_voltage, get_text=c.append_unit('V')),
Signal('/Dc/0/Current', c.ssum, get_value=read_current, get_text=c.append_unit('A')),
Signal('/Dc/0/Power', c.ssum, get_value=read_power, get_text=c.append_unit('W')),
Signal('/BussVoltage', c.mean, c.read_float(register=1001, scale_factor=0.01, offset=0, places=2), c.append_unit('V')),
Signal('/Soc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')),
Signal('/LowestSoc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')),
Signal('/Dc/0/Temperature', c.mean, c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), c.append_unit(u'°C')),
# Charge/Discharge current, voltage and power
Signal('/Info/MaxDischargeCurrent', c.ssum, max_discharge_current,c.append_unit('A')),
Signal('/Info/MaxChargeCurrent', c.ssum, max_charge_current, c.append_unit('A')),
Signal('/Info/MaxChargeVoltage', min, cfg.MAX_CHARGE_VOLTAGE, c.append_unit('V')),
Signal('/Info/MaxChargePower', c.ssum, calc_max_charge_power),
# Victron mandatory dbus paths
Signal('/Mgmt/ProcessName', c.first, __file__),
Signal('/Mgmt/ProcessVersion', c.first, cfg.SOFTWARE_VERSION),
Signal('/Mgmt/Connection', c.first, cfg.CONNECTION),
Signal('/DeviceInstance', c.first, cfg.DEVICE_INSTANCE),
Signal('/ProductName', c.first, product_name),
Signal('/ProductId', c.first, cfg.PRODUCT_ID, product_id_hex),
Signal('/Connected', c.first, 1),
Signal('/FirmwareVersion', c.return_in_list, lambda bs: bs.battery.firmware_version),
Signal('/HardwareVersion', c.return_in_list, lambda bs: bs.battery.hardware_version),
Signal('/BmsVersion', c.return_in_list, lambda s: s.battery.bms_version),
# Warnings
Signal('/WarningFlags/TaM1', c.return_in_list, c.read_bool(register=1005, bit=1)),
Signal('/WarningFlags/TbM1', c.return_in_list, c.read_bool(register=1005, bit=4)),
Signal('/WarningFlags/VBm1', c.return_in_list, c.read_bool(register=1005, bit=6)),
Signal('/WarningFlags/VBM1', c.return_in_list, c.read_bool(register=1005, bit=8)),
Signal('/WarningFlags/IDM1', c.return_in_list, c.read_bool(register=1005, bit=10)),
Signal('/WarningFlags/vsm1', c.return_in_list, c.read_bool(register=1005, bit=22)),
Signal('/WarningFlags/vsM1', c.return_in_list, c.read_bool(register=1005, bit=24)),
Signal('/WarningFlags/iCM1', c.return_in_list, c.read_bool(register=1005, bit=26)),
Signal('/WarningFlags/iDM1', c.return_in_list, c.read_bool(register=1005, bit=28)),
Signal('/WarningFlags/MID1', c.return_in_list, c.read_bool(register=1005, bit=30)),
Signal('/WarningFlags/BLPW', c.return_in_list, c.read_bool(register=1005, bit=32)),
Signal('/WarningFlags/CCBF', c.return_in_list, c.read_bool(register=1005, bit=33)),
Signal('/WarningFlags/Ah_W', c.return_in_list, c.read_bool(register=1005, bit=35)),
Signal('/WarningFlags/MPMM', c.return_in_list, c.read_bool(register=1005, bit=38)),
Signal('/WarningFlags/TCdi', c.return_in_list, c.read_bool(register=1005, bit=40)),
Signal('/WarningFlags/LMPW', c.return_in_list, c.read_bool(register=1005, bit=44)),
Signal('/WarningFlags/TOCW', c.return_in_list, c.read_bool(register=1005, bit=47)),
Signal('/WarningFlags/BUSL', c.return_in_list, c.read_bool(register=1005, bit=49)),
# Alarms
Signal('/AlarmFlags/Tam', c.return_in_list, c.read_bool(register=1009, bit=0)),
Signal('/AlarmFlags/TaM2', c.return_in_list, c.read_bool(register=1009, bit=2)),
Signal('/AlarmFlags/Tbm', c.return_in_list, c.read_bool(register=1009, bit=3)),
Signal('/AlarmFlags/TbM2', c.return_in_list, c.read_bool(register=1009, bit=5)),
Signal('/AlarmFlags/VBm2', c.return_in_list, c.read_bool(register=1009, bit=7)),
Signal('/AlarmFlags/VBM2', c.return_in_list, c.read_bool(register=1009, bit=9)),
Signal('/AlarmFlags/IDM2', c.return_in_list, c.read_bool(register=1009, bit=11)),
Signal('/AlarmFlags/ISOB', c.return_in_list, c.read_bool(register=1009, bit=12)),
Signal('/AlarmFlags/MSWE', c.return_in_list, c.read_bool(register=1009, bit=13)),
Signal('/AlarmFlags/FUSE', c.return_in_list, c.read_bool(register=1009, bit=14)),
Signal('/AlarmFlags/HTRE', c.return_in_list, c.read_bool(register=1009, bit=15)),
Signal('/AlarmFlags/TCPE', c.return_in_list, c.read_bool(register=1009, bit=16)),
Signal('/AlarmFlags/STRE', c.return_in_list, c.read_bool(register=1009, bit=17)),
Signal('/AlarmFlags/CME', c.return_in_list, c.read_bool(register=1009, bit=18)),
Signal('/AlarmFlags/HWFL', c.return_in_list, c.read_bool(register=1009, bit=19)),
Signal('/AlarmFlags/HWEM', c.return_in_list, c.read_bool(register=1009, bit=20)),
Signal('/AlarmFlags/ThM', c.return_in_list, c.read_bool(register=1009, bit=21)),
Signal('/AlarmFlags/vsm2', c.return_in_list, c.read_bool(register=1009, bit=23)),
Signal('/AlarmFlags/vsM2', c.return_in_list, c.read_bool(register=1009, bit=25)),
Signal('/AlarmFlags/iCM2', c.return_in_list, c.read_bool(register=1009, bit=27)),
Signal('/AlarmFlags/iDM2', c.return_in_list, c.read_bool(register=1009, bit=29)),
Signal('/AlarmFlags/MID2', c.return_in_list, c.read_bool(register=1009, bit=31)),
Signal('/AlarmFlags/HTFS', c.return_in_list, c.read_bool(register=1009, bit=42)),
Signal('/AlarmFlags/DATA', c.return_in_list, c.read_bool(register=1009, bit=43)),
Signal('/AlarmFlags/LMPA', c.return_in_list, c.read_bool(register=1009, bit=45)),
Signal('/AlarmFlags/HEBT', c.return_in_list, c.read_bool(register=1009, bit=46)),
Signal('/AlarmFlags/CURM', c.return_in_list, c.read_bool(register=1009, bit=48)),
# LedStatus
Signal('/LedStatus/Red', c.first, c.read_led_state(register=1004, led=LedColor.red)),
Signal('/LedStatus/Blue', c.first, c.read_led_state(register=1004, led=LedColor.blue)),
Signal('/LedStatus/Green', c.first, c.read_led_state(register=1004, led=LedColor.green)),
Signal('/LedStatus/Amber', c.first, c.read_led_state(register=1004, led=LedColor.amber)),
# IO Status
Signal('/IoStatus/MainSwitchClosed', c.return_in_list, read_switch_closed),
Signal('/IoStatus/AlarmOutActive', c.return_in_list, read_alarm_out_active),
Signal('/IoStatus/InternalFanActive', c.return_in_list, c.read_bool(register=1013, bit=2)),
Signal('/IoStatus/VoltMeasurementAllowed', c.return_in_list, c.read_bool(register=1013, bit=3)),
Signal('/IoStatus/AuxRelay', c.return_in_list, read_aux_relay),
Signal('/IoStatus/RemoteState', c.return_in_list, c.read_bool(register=1013, bit=5)),
Signal('/IoStatus/RiscOn', c.return_in_list, c.read_bool(register=1013, bit=6)),
]
def init_modbus(tty):
# type: (str) -> Modbus
logging.debug('initializing Modbus')
return Modbus(
port='/dev/' + tty,
method=cfg.MODE,
baudrate=cfg.BAUD_RATE,
stopbits=cfg.STOP_BITS,
bytesize=cfg.BYTE_SIZE,
timeout=cfg.TIMEOUT,
parity=cfg.PARITY)
def init_dbus(tty, signals):
# type: (str, Iterable[Signal]) -> DBus
logging.debug('initializing DBus service')
dbus = DBus(servicename=cfg.SERVICE_NAME_PREFIX + tty)
logging.debug('initializing DBus paths')
for signal in signals:
init_dbus_path(dbus, signal)
return dbus
# noinspection PyBroadException
def try_get_value(sig):
# type: (Signal) -> object
try:
return sig.get_value(None)
except:
return None
def init_dbus_path(dbus, sig):
# type: (DBus, Signal) -> ()
dbus.add_path(
sig.dbus_path,
try_get_value(sig),
gettextcallback=lambda _, v: sig.get_text(v))
def init_main_loop():
# type: () -> DBusGMainLoop
logging.debug('initializing DBusGMainLoop Loop')
DBusGMainLoop(set_as_default=True)
return GLib.MainLoop()
def report_slave_id(modbus, slave_address):
# type: (Modbus, int) -> str
slave = str(slave_address)
logging.debug('requesting slave id from node ' + slave)
try:
modbus.connect()
request = ReportSlaveIdRequest(unit=slave_address)
response = modbus.execute(request)
if response is ExceptionResponse or issubclass(type(response), ModbusException):
raise Exception('failed to get slave id from ' + slave + ' : ' + str(response))
return response.identifier
finally:
modbus.close()
def identify_battery(modbus, slave_address):
# type: (Modbus, int) -> Battery
logging.info('identifying battery...')
hardware_version, bms_version, ampere_hours = parse_slave_id(modbus, slave_address)
firmware_version = read_firmware_version(modbus, slave_address)
specs = Battery(
slave_address=slave_address,
hardware_version=hardware_version,
firmware_version=firmware_version,
bms_version=bms_version,
ampere_hours=ampere_hours)
logging.info('battery identified:\n{0}'.format(str(specs)))
return specs
def identify_batteries(modbus):
# type: (Modbus) -> list[Battery]
def _identify_batteries():
address_range = range(1, cfg.MAX_SLAVE_ADDRESS + 1)
for slave_address in address_range:
try:
yield identify_battery(modbus, slave_address)
except Exception as e:
logging.info('failed to identify battery at {0} : {1}'.format(str(slave_address), str(e)))
return list(_identify_batteries()) # force that lazy iterable!
def parse_slave_id(modbus, slave_address):
# type: (Modbus, int) -> (str, str, int)
slave_id = report_slave_id(modbus, slave_address)
sid = re.sub(b'[^\x20-\x7E]', b'', slave_id) # remove weird special chars
match = re.match('(?P<hw>48TL(?P<ah>\d+)) *(?P<bms>.*)', sid.decode('ascii'))
if match is None:
raise Exception('no known battery found')
return match.group('hw'), match.group('bms'), int(match.group('ah'))
def read_firmware_version(modbus, slave_address):
# type: (Modbus, int) -> str
logging.debug('reading firmware version')
try:
modbus.connect()
response = read_modbus_registers(modbus, slave_address, base_address=1054, count=1)
register = response.registers[0]
return '{0:0>4X}'.format(register)
finally:
modbus.close() # close in any case
def read_modbus_registers(modbus, slave_address, base_address=cfg.BASE_ADDRESS, count=cfg.NO_OF_REGISTERS):
# type: (Modbus, int) -> ReadInputRegistersResponse
logging.debug('requesting modbus registers {0}-{1}'.format(base_address, base_address + count))
return modbus.read_input_registers(
address=base_address,
count=count,
unit=slave_address)
def read_battery_status(modbus, battery):
# type: (Modbus, Battery) -> BatteryStatus
"""
Read the modbus registers containing the battery's status info.
"""
logging.debug('reading battery status')
try:
modbus.connect()
data = read_modbus_registers(modbus, battery.slave_address)
return BatteryStatus(battery, data.registers)
except Exception as e:
logging.error(f"An error occurred: {e}")
create_batch_of_csv_files() # Call this only if there's an error
raise
finally:
modbus.close() # close in any case
def publish_values(dbus, signals, statuses):
# type: (DBus, Iterable[Signal], Iterable[BatteryStatus]) -> ()
for s in signals:
values = [s.get_value(status) for status in statuses]
with dbus as srv:
srv[s.dbus_path] = s.aggregate(values)
previous_warnings = {}
previous_alarms = {}
num_of_csv_files_saved=0
class MessageType:
ALARM_OR_WARNING = "AlarmOrWarning"
HEARTBEAT = "Heartbeat"
class AlarmOrWarning:
def __init__(self, description, created_by):
self.date = datetime.now().strftime('%Y-%m-%d')
self.time = datetime.now().strftime('%H:%M:%S')
self.description = description
self.created_by = created_by
def to_dict(self):
return {
"Date": self.date,
"Time": self.time,
"Description": self.description,
"CreatedBy": self.created_by
}
def SubscribeToQueue():
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host="10.2.0.11",
port=5672,
virtual_host="/",
heartbeat=30,
credentials=pika.PlainCredentials("producer", "b187ceaddb54d5485063ddc1d41af66f")))
channel = connection.channel()
channel.queue_declare(queue="statusQueue", durable=True)
print("Subscribed to queue")
except Exception as ex:
print("An error occurred while connecting to the RabbitMQ queue:", ex)
return channel
is_first_update = True
prev_status=0
channel = SubscribeToQueue()
# Create an S3config instance
s3_config = S3config()
INSTALLATION_ID=int(s3_config.bucket.split('-')[0])
PRODUCT_ID = 1
def update_state_from_dictionaries(current_warnings, current_alarms, node_numbers):
global previous_warnings, previous_alarms, INSTALLATION_ID, PRODUCT_ID, is_first_update, channel,prev_status
if is_first_update:
changed_warnings = current_warnings
changed_alarms = current_alarms
is_first_update = False
else:
changed_alarms = {}
changed_warnings = {}
for key in current_alarms:
current_value = current_alarms[key]
prev_value = previous_alarms.get(key, False) # Use False if the key doesn't exist
if current_value != prev_value:
changed_alarms[key] = True
else:
changed_alarms[key] = False
for key in current_warnings:
current_value = current_warnings[key]
prev_value = previous_warnings.get(key, False)
if current_value != prev_value:
changed_warnings[key] = True
else:
changed_warnings[key] = False
status_message = {
"InstallationId": INSTALLATION_ID,
"Product": PRODUCT_ID,
"Status": 0,
"Type": 1,
"Warnings": [],
"Alarms": []
}
alarms_number_list = []
for node_number in node_numbers:
cnt = 0
for i, alarm_value in enumerate(current_alarms.values()):
if int(list(current_alarms.keys())[i].split("/")[3]) == int(node_number):
if alarm_value:
cnt+=1
alarms_number_list.append(cnt)
warnings_number_list = []
for node_number in node_numbers:
cnt = 0
for i, warning_value in enumerate(current_warnings.values()):
if int(list(current_warnings.keys())[i].split("/")[3]) == int(node_number):
if warning_value:
cnt+=1
warnings_number_list.append(cnt)
# Evaluate alarms
if any(changed_alarms.values()):
for i, changed_alarm in enumerate(changed_alarms.values()):
if changed_alarm and list(current_alarms.values())[i]:
description = list(current_alarms.keys())[i].split("/")[-1]
device_created = "Battery node " + list(current_alarms.keys())[i].split("/")[3]
status_message["Alarms"].append(AlarmOrWarning(description, device_created).to_dict())
if any(changed_warnings.values()):
for i, changed_warning in enumerate(changed_warnings.values()):
if changed_warning and list(current_warnings.values())[i]:
description = list(current_warnings.keys())[i].split("/")[-1]
device_created = "Battery node " + list(current_warnings.keys())[i].split("/")[3]
status_message["Warnings"].append(AlarmOrWarning(description, device_created).to_dict())
if any(current_alarms.values()):
status_message["Status"]=2
if not any(current_alarms.values()) and any(current_warnings.values()):
status_message["Status"]=1
if not any(current_alarms.values()) and not any(current_warnings.values()):
status_message["Status"]=0
if status_message["Status"]!=prev_status or len(status_message["Warnings"])>0 or len(status_message["Alarms"])>0:
prev_status=status_message["Status"]
status_message["Type"]=0
status_message = json.dumps(status_message)
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
print(status_message)
print("Message sent successfully")
previous_warnings = current_warnings.copy()
previous_alarms = current_alarms.copy()
return status_message, alarms_number_list, warnings_number_list
def read_warning_and_alarm_flags():
return [
# Warnings
CsvSignal('/Battery/Devices/WarningFlags/TaM1', c.read_bool(register=1005, bit=1)),
CsvSignal('/Battery/Devices/WarningFlags/TbM1', c.read_bool(register=1005, bit=4)),
CsvSignal('/Battery/Devices/WarningFlags/VBm1', c.read_bool(register=1005, bit=6)),
CsvSignal('/Battery/Devices/WarningFlags/VBM1', c.read_bool(register=1005, bit=8)),
CsvSignal('/Battery/Devices/WarningFlags/IDM1', c.read_bool(register=1005, bit=10)),
CsvSignal('/Battery/Devices/WarningFlags/vsm1', c.read_bool(register=1005, bit=22)),
CsvSignal('/Battery/Devices/WarningFlags/vsM1', c.read_bool(register=1005, bit=24)),
CsvSignal('/Battery/Devices/WarningFlags/iCM1', c.read_bool(register=1005, bit=26)),
CsvSignal('/Battery/Devices/WarningFlags/iDM1', c.read_bool(register=1005, bit=28)),
CsvSignal('/Battery/Devices/WarningFlags/MID1', c.read_bool(register=1005, bit=30)),
CsvSignal('/Battery/Devices/WarningFlags/BLPW', c.read_bool(register=1005, bit=32)),
CsvSignal('/Battery/Devices/WarningFlags/CCBF', c.read_bool(register=1005, bit=33)),
CsvSignal('/Battery/Devices/WarningFlags/Ah_W', c.read_bool(register=1005, bit=35)),
CsvSignal('/Battery/Devices/WarningFlags/MPMM', c.read_bool(register=1005, bit=38)),
CsvSignal('/Battery/Devices/WarningFlags/TCdi', c.read_bool(register=1005, bit=40)),
CsvSignal('/Battery/Devices/WarningFlags/LMPW', c.read_bool(register=1005, bit=44)),
CsvSignal('/Battery/Devices/WarningFlags/TOCW', c.read_bool(register=1005, bit=47)),
CsvSignal('/Battery/Devices/WarningFlags/BUSL', c.read_bool(register=1005, bit=49)),
], [
# Alarms
CsvSignal('/Battery/Devices/AlarmFlags/Tam', c.read_bool(register=1009, bit=0)),
CsvSignal('/Battery/Devices/AlarmFlags/TaM2', c.read_bool(register=1009, bit=2)),
CsvSignal('/Battery/Devices/AlarmFlags/Tbm', c.read_bool(register=1009, bit=3)),
CsvSignal('/Battery/Devices/AlarmFlags/TbM2', c.read_bool(register=1009, bit=5)),
CsvSignal('/Battery/Devices/AlarmFlags/VBm2', c.read_bool(register=1009, bit=7)),
CsvSignal('/Battery/Devices/AlarmFlags/VBM2', c.read_bool(register=1009, bit=9)),
CsvSignal('/Battery/Devices/AlarmFlags/IDM2', c.read_bool(register=1009, bit=11)),
CsvSignal('/Battery/Devices/AlarmFlags/ISOB', c.read_bool(register=1009, bit=12)),
CsvSignal('/Battery/Devices/AlarmFlags/MSWE', c.read_bool(register=1009, bit=13)),
CsvSignal('/Battery/Devices/AlarmFlags/FUSE', c.read_bool(register=1009, bit=14)),
CsvSignal('/Battery/Devices/AlarmFlags/HTRE', c.read_bool(register=1009, bit=15)),
CsvSignal('/Battery/Devices/AlarmFlags/TCPE', c.read_bool(register=1009, bit=16)),
CsvSignal('/Battery/Devices/AlarmFlags/STRE', c.read_bool(register=1009, bit=17)),
CsvSignal('/Battery/Devices/AlarmFlags/CME', c.read_bool(register=1009, bit=18)),
CsvSignal('/Battery/Devices/AlarmFlags/HWFL', c.read_bool(register=1009, bit=19)),
CsvSignal('/Battery/Devices/AlarmFlags/HWEM', c.read_bool(register=1009, bit=20)),
CsvSignal('/Battery/Devices/AlarmFlags/ThM', c.read_bool(register=1009, bit=21)),
CsvSignal('/Battery/Devices/AlarmFlags/vsm2', c.read_bool(register=1009, bit=23)),
CsvSignal('/Battery/Devices/AlarmFlags/vsM2', c.read_bool(register=1009, bit=25)),
CsvSignal('/Battery/Devices/AlarmFlags/iCM2', c.read_bool(register=1009, bit=27)),
CsvSignal('/Battery/Devices/AlarmFlags/iDM2', c.read_bool(register=1009, bit=29)),
CsvSignal('/Battery/Devices/AlarmFlags/MID2', c.read_bool(register=1009, bit=31)),
CsvSignal('/Battery/Devices/AlarmFlags/HTFS', c.read_bool(register=1009, bit=42)),
CsvSignal('/Battery/Devices/AlarmFlags/DATA', c.read_bool(register=1009, bit=43)),
CsvSignal('/Battery/Devices/AlarmFlags/LMPA', c.read_bool(register=1009, bit=45)),
CsvSignal('/Battery/Devices/AlarmFlags/HEBT', c.read_bool(register=1009, bit=46)),
CsvSignal('/Battery/Devices/AlarmFlags/CURM', c.read_bool(register=1009, bit=48)),
CsvSignal('/Battery/Devices/AlarmFlags/2 or more string are disabled',c.read_limb_string(1059)),
]
def update(modbus, batteries, dbus, signals, csv_signals):
# type: (Modbus, Iterable[Battery], DBus, Iterable[Signal]) -> bool
"""
Main update function
1. requests status record each battery via modbus,
2. parses the data using Signal.get_value
3. aggregates the data from all batteries into one datum using Signal.aggregate
4. publishes the data on the dbus
"""
logging.debug('starting update cycle')
warnings_signals, alarm_signals = read_warning_and_alarm_flags()
current_warnings = {}
current_alarms= {}
statuses = [read_battery_status(modbus, battery) for battery in batteries]
node_numbers = [battery.slave_address for battery in batteries]
# Iterate over each node and signal to create rows in the new format
for i, node in enumerate(node_numbers):
for s in warnings_signals:
signal_name = insert_id(s.name, node)
value = s.get_value(statuses[i])
current_warnings[signal_name] = value
for s in alarm_signals:
signal_name = insert_id(s.name, node)
value = s.get_value(statuses[i])
current_alarms[signal_name] = value
#print(update_state_from_dictionaries(current_warnings, current_alarms))
status_message, alarms_number_list, warnings_number_list = update_state_from_dictionaries(current_warnings, current_alarms, node_numbers)
publish_values(dbus, signals, statuses)
create_csv_files(csv_signals, statuses, node_numbers, alarms_number_list, warnings_number_list)
logging.debug('finished update cycle\n')
return True
def print_usage():
print('Usage: ' + __file__ + ' <serial device>')
print('Example: ' + __file__ + ' ttyUSB0')
def parse_cmdline_args(argv):
# type: (list[str]) -> str
if len(argv) == 0:
logging.info('missing command line argument for tty device')
print_usage()
sys.exit(1)
return argv[0]
def count_files_in_folder(folder_path):
try:
# List all files in the folder
files = os.listdir(folder_path)
# Filter out directories, only count files
num_files = sum(1 for f in files if os.path.isfile(os.path.join(folder_path, f)))
return num_files
except FileNotFoundError:
return "Folder not found"
except Exception as e:
return str(e)
def create_batch_of_csv_files():
global prev_status,INSTALLATION_ID, PRODUCT_ID, num_of_csv_files_saved
# list all files in the directory
files = os.listdir(CSV_DIR)
# filter out only csv files
csv_files = [file for file in files if file.endswith('.csv')]
# sort csv files by creation time
csv_files.sort(key=lambda x: os.path.getctime(os.path.join(CSV_DIR, x)))
# keep the 600 MOST RECENT FILES
recent_csv_files = csv_files[-num_of_csv_files_saved:]
print("num_of_csv_files_saved is " + str(num_of_csv_files_saved))
# get the name of the first csv file
if not csv_files:
print("No csv files found in the directory.")
exit(0)
first_csv_file = os.path.join(CSV_DIR, recent_csv_files.pop(0))
first_csv_filename = os.path.basename(first_csv_file)
temp_file_path = os.path.join(CSV_DIR, 'temp_batch_file.csv')
# create a temporary file and write the timestamp and the original content of the first file
with open(temp_file_path, 'wb') as temp_file:
# Write the timestamp (filename) at the beginning
numeric_part = first_csv_filename.split('.')[0]
temp_file.write(f'Timestamp;{numeric_part}\n'.encode('utf-8'))
# write the original content of the first csv file
with open(first_csv_file, 'rb') as f:
temp_file.write(f.read())
for csv_file in recent_csv_files:
file_path = os.path.join(CSV_DIR, csv_file)
# write an empty line
temp_file.write(b'\n')
# write the timestamp (filename)
numeric_part = csv_file.split('.')[0]
temp_file.write(f'Timestamp;{numeric_part}\n'.encode('utf-8'))
# write the content of the file
with open(file_path, 'rb') as f:
temp_file.write(f.read())
# replace the original first csv file with the temporary file
os.remove(first_csv_file)
os.rename(temp_file_path, first_csv_file)
num_of_csv_files_saved = 0
# create a loggin directory that contains at max 20 batch files for logging info
# logging_dir = os.path.join(CSV_DIR, 'logging_batch_files')
# if not os.path.exists(logging_dir):
# os.makedirs(logging_dir)
#
# shutil.copy(first_csv_file, logging_dir)
# manage_csv_files(logging_dir)
# prepare for compression
csv_data = read_csv_as_string(first_csv_file)
if csv_data is None:
print("error while reading csv as string")
return
# zip-comp additions
compressed_csv = compress_csv_data(csv_data)
# Use the name of the last (most recent) CSV file in sorted csv_files as the name for the compressed file
last_csv_file_name = os.path.basename(recent_csv_files[-1]) if recent_csv_files else first_csv_filename
numeric_part = int(last_csv_file_name.split('.')[0][:-2])
compressed_filename = "{}.csv".format(numeric_part)
response = s3_config.create_put_request(compressed_filename, compressed_csv)
if response.status_code == 200:
os.remove(first_csv_file)
print("Successfully uploaded the compresseed batch of files in s3")
status_message = {
"InstallationId": INSTALLATION_ID,
"Product": PRODUCT_ID,
"Status": prev_status,
"Type": 1,
"Warnings": [],
"Alarms": [],
}
status_message = json.dumps(status_message)
try:
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
except:
channel = SubscribeToQueue()
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
print("Successfully sent the heartbit with timestamp")
else:
# we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging
failed_dir = os.path.join(CSV_DIR, "failed")
if not os.path.exists(failed_dir):
os.makedirs(failed_dir)
failed_path = os.path.join(failed_dir, first_csv_filename)
os.rename(first_csv_file, failed_path)
print("Uploading failed")
manage_csv_files(failed_dir, 100)
alive = True # global alive flag, watchdog_task clears it, update_task sets it
ALLOW = False
def create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop):
# type: (Modbus, DBus, Iterable[Battery], Iterable[Signal], DBusGMainLoop) -> Callable[[],bool]
"""
Creates an update task which runs the main update function
and resets the alive flag
"""
start_time = time.time()
def update_task():
# type: () -> bool
nonlocal start_time
global alive, ALLOW
if ALLOW:
ALLOW = False
else:
ALLOW = True
alive = update(modbus, batteries, dbus, signals, csv_signals)
elapsed_time = time.time() - start_time
print("11111111111111111111111111111111111111111111 elapsed time is ", elapsed_time)
# keep at most 1900 files at CSV_DIR for logging and aggregation
manage_csv_files(CSV_DIR, 1900)
if elapsed_time >= 1200:
print("CREATE BATCH ======================================>")
create_batch_of_csv_files()
start_time = time.time()
#alive = update_for_testing(modbus, batteries, dbus, signals, csv_signals)
if not alive:
logging.info('update_task: quitting main loop because of error')
main_loop.quit()
return alive
return update_task
def create_watchdog_task(main_loop):
# type: (DBusGMainLoop) -> Callable[[],bool]
"""
Creates a Watchdog task that monitors the alive flag.
The watchdog kills the main loop if the alive flag is not periodically reset by the update task.
Who watches the watchdog?
"""
def watchdog_task():
# type: () -> bool
global alive
if alive:
logging.debug('watchdog_task: update_task is alive')
alive = False
return True
else:
logging.info('watchdog_task: killing main loop because update_task is no longer alive')
main_loop.quit()
return False
return watchdog_task
def get_installation_name(file_path):
with open(file_path, 'r') as file:
return file.read().strip()
def manage_csv_files(directory_path, max_files=20):
csv_files = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
csv_files.sort(key=lambda x: os.path.getctime(os.path.join(directory_path, x)))
# Remove oldest files if exceeds maximum
while len(csv_files) > max_files:
file_to_delete = os.path.join(directory_path, csv_files.pop(0))
os.remove(file_to_delete)
def insert_id(path, id_number):
parts = path.split("/")
insert_position = parts.index("Devices") + 1
parts.insert(insert_position, str(id_number))
return "/".join(parts)
def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnings_number_list):
global s3_config, num_of_csv_files_saved
timestamp = int(time.time())
if timestamp % 2 != 0:
timestamp -= 1
# Create CSV directory if it doesn't exist
if not os.path.exists(CSV_DIR):
os.makedirs(CSV_DIR)
csv_filename = f"{timestamp}.csv"
csv_path = os.path.join(CSV_DIR, csv_filename)
num_of_csv_files_saved+=1
# Append values to the CSV file
if not os.path.exists(csv_path):
with open(csv_path, 'a', newline='') as csvfile:
csv_writer = csv.writer(csvfile, delimiter=';')
# Add a special row for the nodes configuration
nodes_config_path = "/Config/Devices/BatteryNodes"
nodes_list = ",".join(str(node) for node in node_numbers)
config_row = [nodes_config_path, nodes_list, ""]
csv_writer.writerow(config_row)
# Iterate over each node and signal to create rows in the new format
for i, node in enumerate(node_numbers):
csv_writer.writerow([f"/Battery/Devices/{str(i+1)}/Alarms", alarms_number_list[i], ""])
csv_writer.writerow([f"/Battery/Devices/{str(i+1)}/Warnings", warnings_number_list[i], ""])
for s in signals:
signal_name = insert_id(s.name, i+1)
value = s.get_value(statuses[i])
row_values = [signal_name, value, s.get_text]
csv_writer.writerow(row_values)
BATTERY_COUNTS_FILE = '/data/battery_count.csv'
def load_battery_counts():
if os.path.exists(BATTERY_COUNTS_FILE):
with open(BATTERY_COUNTS_FILE, 'r') as f:
reader = csv.reader(f)
return [int(row[0]) for row in reader]
return []
def save_battery_counts(battery_counts):
with open(BATTERY_COUNTS_FILE, 'w', newline='') as f:
writer = csv.writer(f)
for count in battery_counts:
writer.writerow([count])
def main(argv):
# type: (list[str]) -> ()
logging.basicConfig(level=cfg.LOG_LEVEL)
logging.info('starting ' + __file__)
tty = parse_cmdline_args(argv)
battery_counts = load_battery_counts()
max_retry_attempts = 3 # Stop retrying in case it's a real battery loss case
retry_attempts = 0
while True:
modbus = init_modbus(tty)
batteries = identify_batteries(modbus)
n = len(batteries)
logging.info('found %d %s', n, "battery" if n == 1 else "batteries")
if n <= 0:
sys.exit(2) # Exit if no batteries are found
if not battery_counts or n > max(battery_counts):
# If it's the first detection or detect more batteries than ever before
logging.info("It's new or more batteries detected")
battery_counts.append(n)
retry_attempts = 0
save_battery_counts(battery_counts)
elif n < max(battery_counts):
retry_attempts += 1
logging.warning('Attempt %d/%d: Detected fewer batteries than previously detected.',
retry_attempts, max_retry_attempts)
# If max retry attempts are exceeded, continue with fewer batteries
if retry_attempts >= max_retry_attempts:
logging.warning('Max retry attempts reached. Continuing with fewer batteries.')
save_battery_counts(battery_counts)
break
continue
elif n == max(battery_counts):
logging.info('Detected the same number of batteries as before. No need to re-detect.')
break
signals = init_signals(n)
csv_signals = create_csv_signals()
main_loop = init_main_loop() # must run before init_dbus because gobject does some global magic
dbus = init_dbus(tty, signals)
update_task = create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop)
watchdog_task = create_watchdog_task(main_loop)
GLib.timeout_add(cfg.UPDATE_INTERVAL * 2, watchdog_task) # add watchdog first
GLib.timeout_add(cfg.UPDATE_INTERVAL, update_task) # call update once every update_interval
logging.info('starting GLib.MainLoop')
main_loop.run()
logging.info('GLib.MainLoop was shut down')
sys.exit(0xFF) # reaches this only on error
if __name__ == "__main__":
main(sys.argv[1:])