Innovenergy_trunk/python/dbus-fzsonick-48tl-nofork/dbus-fzsonick-48tl.py

1064 lines
46 KiB
Python
Raw Normal View History

2024-05-28 14:25:22 +00:00
#!/usr/bin/python3 -u
# coding=utf-8
import re
import sys
import logging
from gi.repository import GLib
import config as cfg
import convert as c
from pymodbus.register_read_message import ReadInputRegistersResponse
from pymodbus.client.sync import ModbusSerialClient as Modbus
from pymodbus.other_message import ReportSlaveIdRequest
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ExceptionResponse
from dbus.mainloop.glib import DBusGMainLoop
from data import BatteryStatus, Signal, Battery, LedColor, CsvSignal, LedState
from collections import Iterable
from os import path
app_dir = path.dirname(path.realpath(__file__))
sys.path.insert(1, path.join(app_dir, 'ext', 'velib_python'))
from vedbus import VeDbusService as DBus
import time
import os
import csv
import requests
import hmac
import hashlib
import base64
from datetime import datetime
import io
import json
import requests
import hmac
import hashlib
import base64
from datetime import datetime
import pika
import time
# zip-comp additions
import zipfile
import io
def compress_csv_data(csv_data, file_name="data.csv"):
memory_stream = io.BytesIO()
# Create a zip archive in the memory buffer
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
# Add CSV data to the ZIP archive
with archive.open('data.csv', 'w') as entry_stream:
entry_stream.write(csv_data.encode('utf-8'))
# Get the compressed byte array from the memory buffer
compressed_bytes = memory_stream.getvalue()
# Encode the compressed byte array as a Base64 string
base64_string = base64.b64encode(compressed_bytes).decode('utf-8')
return base64_string
class S3config:
def __init__(self):
self.bucket = cfg.S3BUCKET
self.region = "sos-ch-dk-2"
self.provider = "exo.io"
self.key = cfg.S3KEY
self.secret = cfg.S3SECRET
self.content_type = "application/base64; charset=utf-8"
@property
def host(self):
return f"{self.bucket}.{self.region}.{self.provider}"
@property
def url(self):
return f"https://{self.host}"
def create_put_request(self, s3_path, data):
headers = self._create_request("PUT", s3_path)
url = f"{self.url}/{s3_path}"
response = requests.put(url, headers=headers, data=data)
return response
def _create_request(self, method, s3_path):
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
auth = self._create_authorization(method, self.bucket, s3_path, date, self.key, self.secret, self.content_type)
headers = {
"Host": self.host,
"Date": date,
"Authorization": auth,
"Content-Type": self.content_type
}
return headers
@staticmethod
def _create_authorization(method, bucket, s3_path, date, s3_key, s3_secret, content_type="", md5_hash=""):
payload = f"{method}\n{md5_hash}\n{content_type}\n{date}\n/{bucket.strip('/')}/{s3_path.strip('/')}"
signature = base64.b64encode(
hmac.new(s3_secret.encode(), payload.encode(), hashlib.sha1).digest()
).decode()
return f"AWS {s3_key}:{signature}"
@staticmethod
def _create_authorization(method, bucket, s3_path, date, s3_key, s3_secret, content_type="", md5_hash=""):
payload = f"{method}\n{md5_hash}\n{content_type}\n{date}\n/{bucket.strip('/')}/{s3_path.strip('/')}"
signature = base64.b64encode(
hmac.new(s3_secret.encode(), payload.encode(), hashlib.sha1).digest()
).decode()
return f"AWS {s3_key}:{signature}"
def read_csv_as_string(file_path):
"""
Reads a CSV file from the given path and returns its content as a single string.
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
print(f"Error: The file {file_path} does not exist.")
return None
except IOError as e:
print(f"IO error occurred: {str(e)}")
return None
CSV_DIR = "/data/csv_files/"
#CSV_DIR = "csv_files/"
# Define the path to the file containing the installation name
INSTALLATION_NAME_FILE = '/data/innovenergy/openvpn/installation-name'
# trick the pycharm type-checker into thinking Callable is in scope, not used at runtime
# noinspection PyUnreachableCode
if False:
from typing import Callable
def interpret_limb_bitmap(bitmap_value):
# The bit for string 1 also monitors all 5 strings: 0000 0000 means All 5 strings activated. 0000 0001 means string 1 disabled.
string1_disabled = int((bitmap_value & 0b00001) != 0)
string2_disabled = int((bitmap_value & 0b00010) != 0)
string3_disabled = int((bitmap_value & 0b00100) != 0)
string4_disabled = int((bitmap_value & 0b01000) != 0)
string5_disabled = int((bitmap_value & 0b10000) != 0)
n_limb_strings = string1_disabled+string2_disabled+string3_disabled+string4_disabled+string5_disabled
return n_limb_strings
def calc_power_limit_imposed_by_voltage_limit(v, i, v_limit, r_int):
# type: (float, float, float, float) -> float
dv = v_limit - v
di = dv / r_int
p_limit = v_limit * (i + di)
return p_limit
def calc_power_limit_imposed_by_current_limit(v, i, i_limit, r_int):
# type: (float, float, float, float) -> float
di = i_limit - i
dv = di * r_int
p_limit = i_limit * (v + dv)
return p_limit
def read_switch_closed(status):
value = c.read_bool(register=1013, bit=0)(status)
if value:
return False
return True
def read_alarm_out_active(status):
value = c.read_bool(register=1013, bit=1)(status)
if value:
return False
return True
def read_aux_relay(status):
value = c.read_bool(register=1013, bit=4)(status)
if value:
return False
return True
def hex_string_to_ascii(hex_string):
# Ensure the hex_string is correctly formatted without spaces
hex_string = hex_string.replace(" ", "")
# Convert every two characters (a byte) in the hex string to ASCII
ascii_string = ''.join([chr(int(hex_string[i:i+2], 16)) for i in range(0, len(hex_string), 2)])
return ascii_string
battery_status_reader = c.read_hex_string(1060,2)
def read_eoc_reached(status):
battery_status_string = battery_status_reader(status)
return hex_string_to_ascii(battery_status_string) == "EOC_"
def return_led_state(status, color):
led_state = c.read_led_state(register=1004, led=color)(status)
if led_state == LedState.blinking_fast or led_state == LedState.blinking_slow:
return "Blinking"
elif led_state == LedState.on:
return "On"
elif led_state == LedState.off:
return "Off"
return "Unknown"
def return_led_state_blue(status):
return return_led_state(status, LedColor.blue)
def return_led_state_red(status):
return return_led_state(status, LedColor.red)
def return_led_state_green(status):
return return_led_state(status, LedColor.green)
def return_led_state_amber(status):
return return_led_state(status, LedColor.amber)
def read_serial_number(status):
serial_regs = [1055, 1056, 1057, 1058]
serial_parts = []
for reg in serial_regs:
# reading each register as a single hex value
hex_value_fun = c.read_hex_string(reg, 1)
hex_value = hex_value_fun(status)
# append without spaces and leading zeros stripped if any
serial_parts.append(hex_value.replace(' ', ''))
# concatenate all parts to form the full serial number
serial_number = ''.join(serial_parts).rstrip('0')
return serial_number
def time_since_toc_in_time_format(status):
time_in_minutes = c.read_float(register=1052)(status)
# Convert minutes to total seconds
total_seconds = int(time_in_minutes * 60)
# Calculate days, hours, minutes, and seconds
days = total_seconds // (24 * 3600)
total_seconds = total_seconds % (24 * 3600)
hours = total_seconds // 3600
total_seconds %= 3600
minutes = total_seconds // 60
seconds = total_seconds % 60
# Format the string to show days.hours:minutes:seconds
return f"{days}.{hours:02}:{minutes:02}:{seconds:02}"
def create_csv_signals(firmware_version):
read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2)
read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2)
read_limb_bitmap = c.read_bitmap(1059)
def read_power(status):
return int(read_current(status) * read_voltage(status))
def string1_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00001) != 0)
def string2_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00010) != 0)
def string3_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00100) != 0)
def string4_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b01000) != 0)
def string5_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b10000) != 0)
def limp_strings_value(status):
return interpret_limb_bitmap(read_limb_bitmap(status))
def calc_max_charge_power(status):
# type: (BatteryStatus) -> int
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
i_max = n_strings * cfg.I_MAX_PER_STRING
v_max = cfg.V_MAX
r_int_min = cfg.R_STRING_MIN / n_strings
r_int_max = cfg.R_STRING_MAX / n_strings
v = read_voltage(status)
i = read_current(status)
p_limits = [
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_min),
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_max),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_min),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_max),
]
p_limit = min(p_limits) # p_limit is normally positive here (signed)
p_limit = max(p_limit, 0) # charge power must not become negative
return int(p_limit)
def calc_max_discharge_power(status):
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
max_discharge_current = n_strings*cfg.I_MAX_PER_STRING
return int(max_discharge_current*read_voltage(status))
total_current = c.read_float(register=1062, scale_factor=0.01, offset=-10000, places=1)
def read_total_current(status):
return total_current(status)
def read_heating_current(status):
return total_current(status) - read_current(status)
def read_heating_power(status):
return read_voltage(status) * read_heating_current(status)
soc_ah = c.read_float(register=1002, scale_factor=0.1, offset=-10000, places=1)
def read_soc_ah(status):
return soc_ah(status)
return [
CsvSignal('/Battery/Devices/FwVersion', firmware_version),
CsvSignal('/Battery/Devices/Dc/Power', read_power, 'W'),
CsvSignal('/Battery/Devices/Dc/Voltage', read_voltage, 'V'),
CsvSignal('/Battery/Devices/Soc', c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), '%'),
CsvSignal('/Battery/Devices/Temperatures/Cells/Average', c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), 'C'),
CsvSignal('/Battery/Devices/Dc/Current', read_current, 'A'),
CsvSignal('/Battery/Devices/BusCurrent', read_total_current, 'A'),
CsvSignal('/Battery/Devices/CellsCurrent', read_current, 'A'),
CsvSignal('/Battery/Devices/HeatingCurrent', read_heating_current, 'A'),
CsvSignal('/Battery/Devices/HeatingPower', read_heating_power, 'W'),
CsvSignal('/Battery/Devices/SOCAh', read_soc_ah),
CsvSignal('/Battery/Devices/Leds/Blue', return_led_state_blue),
CsvSignal('/Battery/Devices/Leds/Red', return_led_state_red),
CsvSignal('/Battery/Devices/Leds/Green', return_led_state_green),
CsvSignal('/Battery/Devices/Leds/Amber', return_led_state_amber),
CsvSignal('/Battery/Devices/BatteryStrings/String1Active', string1_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String2Active', string2_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String3Active', string3_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String4Active', string4_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String5Active', string5_disabled),
CsvSignal('/Battery/Devices/IoStatus/ConnectedToDcBus', read_switch_closed),
CsvSignal('/Battery/Devices/IoStatus/AlarmOutActive', read_alarm_out_active),
CsvSignal('/Battery/Devices/IoStatus/InternalFanActive', c.read_bool(register=1013, bit=2)),
CsvSignal('/Battery/Devices/IoStatus/VoltMeasurementAllowed', c.read_bool(register=1013, bit=3)),
CsvSignal('/Battery/Devices/IoStatus/AuxRelayBus', read_aux_relay),
CsvSignal('/Battery/Devices/IoStatus/RemoteStateActive', c.read_bool(register=1013, bit=5)),
CsvSignal('/Battery/Devices/IoStatus/RiscActive', c.read_bool(register=1013, bit=6)),
CsvSignal('/Battery/Devices/Eoc', read_eoc_reached),
CsvSignal('/Battery/Devices/SerialNumber', read_serial_number),
CsvSignal('/Battery/Devices/TimeSinceTOC', time_since_toc_in_time_format),
CsvSignal('/Battery/Devices/MaxChargePower', calc_max_charge_power),
CsvSignal('/Battery/Devices/MaxDischargePower', calc_max_discharge_power),
]
def init_signals(hardware_version, firmware_version, n_batteries):
# type: (str,str,int) -> Iterable[Signal]
"""
A Signal holds all information necessary for the handling of a
certain datum (e.g. voltage) published by the battery.
Signal(dbus_path, aggregate, get_value, get_text = str)
dbus_path: str
object_path on DBus where the datum needs to be published
aggregate: Iterable[object] -> object
function that combines the values of multiple batteries into one.
e.g. sum for currents, or mean for voltages
get_value: (BatteryStatus) -> object [optional]
function to extract the datum from the modbus record,
alternatively: a constant
get_text: (object) -> unicode [optional]
function to render datum to text, needed by DBus
alternatively: a constant
The conversion functions use the same parameters (e.g scale_factor, offset)
as described in the document 'T48TLxxx ModBus Protocol Rev.7.1' which can
be found in the /doc folder
"""
product_id_hex = '0x{0:04x}'.format(cfg.PRODUCT_ID)
read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2)
read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2)
read_limb_bitmap = c.read_bitmap(1059)
def read_power(status):
return int(read_current(status) * read_voltage(status))
def limp_strings_value(status):
return interpret_limb_bitmap(read_limb_bitmap(status))
def max_discharge_current(status):
return (cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status))*cfg.I_MAX_PER_STRING
def max_charge_current(status):
return status.battery.ampere_hours/2
def calc_max_charge_power(status):
# type: (BatteryStatus) -> int
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
i_max = n_strings * cfg.I_MAX_PER_STRING
v_max = cfg.V_MAX
r_int_min = cfg.R_STRING_MIN / n_strings
r_int_max = cfg.R_STRING_MAX / n_strings
v = read_voltage(status)
i = read_current(status)
p_limits = [
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_min),
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_max),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_min),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_max),
]
p_limit = min(p_limits) # p_limit is normally positive here (signed)
p_limit = max(p_limit, 0) # charge power must not become negative
return int(p_limit)
product_name = cfg.PRODUCT_NAME
if n_batteries > 1:
product_name = cfg.PRODUCT_NAME + ' x' + str(n_batteries)
return [
# Node Red related dbus paths
Signal('/TimeToTOCRequest', max, c.read_float(register=1052)),
Signal('/EOCReached', c.return_in_list, read_eoc_reached),
Signal('/NumOfLimbStrings', c.return_in_list, get_value=limp_strings_value),
Signal('/NumOfBatteries', max, get_value=n_batteries),
Signal('/Dc/0/Voltage', c.mean, get_value=read_voltage, get_text=c.append_unit('V')),
Signal('/Dc/0/Current', c.ssum, get_value=read_current, get_text=c.append_unit('A')),
Signal('/Dc/0/Power', c.ssum, get_value=read_power, get_text=c.append_unit('W')),
Signal('/BussVoltage', c.mean, c.read_float(register=1001, scale_factor=0.01, offset=0, places=2), c.append_unit('V')),
Signal('/Soc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')),
Signal('/LowestSoc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')),
Signal('/Dc/0/Temperature', c.mean, c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), c.append_unit(u'°C')),
Signal('/Dc/0/LowestTemperature', min, c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), c.append_unit(u'°C')),
# Charge/Discharge current, voltage and power
Signal('/Info/MaxDischargeCurrent', c.ssum, max_discharge_current,c.append_unit('A')),
Signal('/Info/MaxChargeCurrent', c.ssum, max_charge_current, c.append_unit('A')),
Signal('/Info/MaxChargeVoltage', min, cfg.MAX_CHARGE_VOLTAGE, c.append_unit('V')),
Signal('/Info/MaxChargePower', c.ssum, calc_max_charge_power),
# Victron mandatory dbus paths
Signal('/Mgmt/ProcessName', c.first, __file__),
Signal('/Mgmt/ProcessVersion', c.first, cfg.SOFTWARE_VERSION),
Signal('/Mgmt/Connection', c.first, cfg.CONNECTION),
Signal('/DeviceInstance', c.first, cfg.DEVICE_INSTANCE),
Signal('/ProductName', c.first, product_name),
Signal('/ProductId', c.first, cfg.PRODUCT_ID, product_id_hex),
Signal('/Connected', c.first, 1),
Signal('/FirmwareVersion', c.return_in_list, firmware_version),
Signal('/HardwareVersion', c.first, cfg.HARDWARE_VERSION, hardware_version),
# Diagnostics
Signal('/Diagnostics/BmsVersion', c.first, lambda s: s.battery.bms_version),
# Warnings
Signal('/WarningFlags/TaM1', c.return_in_list, c.read_bool(register=1005, bit=1)),
Signal('/WarningFlags/TbM1', c.return_in_list, c.read_bool(register=1005, bit=4)),
Signal('/WarningFlags/VBm1', c.return_in_list, c.read_bool(register=1005, bit=6)),
Signal('/WarningFlags/VBM1', c.return_in_list, c.read_bool(register=1005, bit=8)),
Signal('/WarningFlags/IDM1', c.return_in_list, c.read_bool(register=1005, bit=10)),
Signal('/WarningFlags/vsm1', c.return_in_list, c.read_bool(register=1005, bit=22)),
Signal('/WarningFlags/vsM1', c.return_in_list, c.read_bool(register=1005, bit=24)),
Signal('/WarningFlags/iCM1', c.return_in_list, c.read_bool(register=1005, bit=26)),
Signal('/WarningFlags/iDM1', c.return_in_list, c.read_bool(register=1005, bit=28)),
Signal('/WarningFlags/MID1', c.return_in_list, c.read_bool(register=1005, bit=30)),
Signal('/WarningFlags/BLPW', c.return_in_list, c.read_bool(register=1005, bit=32)),
Signal('/WarningFlags/CCBF', c.return_in_list, c.read_bool(register=1005, bit=33)),
Signal('/WarningFlags/Ah_W', c.return_in_list, c.read_bool(register=1005, bit=35)),
Signal('/WarningFlags/MPMM', c.return_in_list, c.read_bool(register=1005, bit=38)),
Signal('/WarningFlags/TCdi', c.return_in_list, c.read_bool(register=1005, bit=40)),
Signal('/WarningFlags/LMPW', c.return_in_list, c.read_bool(register=1005, bit=44)),
Signal('/WarningFlags/TOCW', c.return_in_list, c.read_bool(register=1005, bit=47)),
Signal('/WarningFlags/BUSL', c.return_in_list, c.read_bool(register=1005, bit=49)),
# Alarms
Signal('/AlarmFlags/Tam', c.return_in_list, c.read_bool(register=1005, bit=0)),
Signal('/AlarmFlags/TaM2', c.return_in_list, c.read_bool(register=1005, bit=2)),
Signal('/AlarmFlags/Tbm', c.return_in_list, c.read_bool(register=1005, bit=3)),
Signal('/AlarmFlags/TbM2', c.return_in_list, c.read_bool(register=1005, bit=5)),
Signal('/AlarmFlags/VBm2', c.return_in_list, c.read_bool(register=1005, bit=7)),
Signal('/AlarmFlags/VBM2', c.return_in_list, c.read_bool(register=1005, bit=9)),
Signal('/AlarmFlags/IDM2', c.return_in_list, c.read_bool(register=1005, bit=11)),
Signal('/AlarmFlags/ISOB', c.return_in_list, c.read_bool(register=1005, bit=12)),
Signal('/AlarmFlags/MSWE', c.return_in_list, c.read_bool(register=1005, bit=13)),
Signal('/AlarmFlags/FUSE', c.return_in_list, c.read_bool(register=1005, bit=14)),
Signal('/AlarmFlags/HTRE', c.return_in_list, c.read_bool(register=1005, bit=15)),
Signal('/AlarmFlags/TCPE', c.return_in_list, c.read_bool(register=1005, bit=16)),
Signal('/AlarmFlags/STRE', c.return_in_list, c.read_bool(register=1005, bit=17)),
Signal('/AlarmFlags/CME', c.return_in_list, c.read_bool(register=1005, bit=18)),
Signal('/AlarmFlags/HWFL', c.return_in_list, c.read_bool(register=1005, bit=19)),
Signal('/AlarmFlags/HWEM', c.return_in_list, c.read_bool(register=1005, bit=20)),
Signal('/AlarmFlags/ThM', c.return_in_list, c.read_bool(register=1005, bit=21)),
Signal('/AlarmFlags/vsm2', c.return_in_list, c.read_bool(register=1005, bit=23)),
Signal('/AlarmFlags/vsM2', c.return_in_list, c.read_bool(register=1005, bit=25)),
Signal('/AlarmFlags/iCM2', c.return_in_list, c.read_bool(register=1005, bit=27)),
Signal('/AlarmFlags/iDM2', c.return_in_list, c.read_bool(register=1005, bit=29)),
Signal('/AlarmFlags/MID2', c.return_in_list, c.read_bool(register=1005, bit=31)),
Signal('/AlarmFlags/HTFS', c.return_in_list, c.read_bool(register=1005, bit=42)),
Signal('/AlarmFlags/DATA', c.return_in_list, c.read_bool(register=1005, bit=43)),
Signal('/AlarmFlags/LMPA', c.return_in_list, c.read_bool(register=1005, bit=45)),
Signal('/AlarmFlags/HEBT', c.return_in_list, c.read_bool(register=1005, bit=46)),
Signal('/AlarmFlags/CURM', c.return_in_list, c.read_bool(register=1005, bit=48)),
# LedStatus
Signal('/Diagnostics/LedStatus/Red', c.first, c.read_led_state(register=1004, led=LedColor.red)),
Signal('/Diagnostics/LedStatus/Blue', c.first, c.read_led_state(register=1004, led=LedColor.blue)),
Signal('/Diagnostics/LedStatus/Green', c.first, c.read_led_state(register=1004, led=LedColor.green)),
Signal('/Diagnostics/LedStatus/Amber', c.first, c.read_led_state(register=1004, led=LedColor.amber)),
# IO Status
Signal('/Diagnostics/IoStatus/MainSwitchClosed', c.return_in_list, read_switch_closed),
Signal('/Diagnostics/IoStatus/AlarmOutActive', c.return_in_list, read_alarm_out_active),
Signal('/Diagnostics/IoStatus/InternalFanActive', c.return_in_list, c.read_bool(register=1013, bit=2)),
Signal('/Diagnostics/IoStatus/VoltMeasurementAllowed', c.return_in_list, c.read_bool(register=1013, bit=3)),
Signal('/Diagnostics/IoStatus/AuxRelay', c.return_in_list, read_aux_relay),
Signal('/Diagnostics/IoStatus/RemoteState', c.return_in_list, c.read_bool(register=1013, bit=5)),
Signal('/Diagnostics/IoStatus/RiscOn', c.return_in_list, c.read_bool(register=1013, bit=6)),
]
def init_modbus(tty):
# type: (str) -> Modbus
logging.debug('initializing Modbus')
return Modbus(
port='/dev/' + tty,
method=cfg.MODE,
baudrate=cfg.BAUD_RATE,
stopbits=cfg.STOP_BITS,
bytesize=cfg.BYTE_SIZE,
timeout=cfg.TIMEOUT,
parity=cfg.PARITY)
def init_dbus(tty, signals):
# type: (str, Iterable[Signal]) -> DBus
logging.debug('initializing DBus service')
dbus = DBus(servicename=cfg.SERVICE_NAME_PREFIX + tty)
logging.debug('initializing DBus paths')
for signal in signals:
init_dbus_path(dbus, signal)
return dbus
# noinspection PyBroadException
def try_get_value(sig):
# type: (Signal) -> object
try:
return sig.get_value(None)
except:
return None
def init_dbus_path(dbus, sig):
# type: (DBus, Signal) -> ()
dbus.add_path(
sig.dbus_path,
try_get_value(sig),
gettextcallback=lambda _, v: sig.get_text(v))
def init_main_loop():
# type: () -> DBusGMainLoop
logging.debug('initializing DBusGMainLoop Loop')
DBusGMainLoop(set_as_default=True)
return GLib.MainLoop()
def report_slave_id(modbus, slave_address):
# type: (Modbus, int) -> str
slave = str(slave_address)
logging.debug('requesting slave id from node ' + slave)
try:
modbus.connect()
request = ReportSlaveIdRequest(unit=slave_address)
response = modbus.execute(request)
if response is ExceptionResponse or issubclass(type(response), ModbusException):
raise Exception('failed to get slave id from ' + slave + ' : ' + str(response))
return response.identifier
finally:
modbus.close()
def identify_battery(modbus, slave_address):
# type: (Modbus, int) -> Battery
logging.info('identifying battery...')
hardware_version, bms_version, ampere_hours = parse_slave_id(modbus, slave_address)
firmware_version = read_firmware_version(modbus, slave_address)
specs = Battery(
slave_address=slave_address,
hardware_version=hardware_version,
firmware_version=firmware_version,
bms_version=bms_version,
ampere_hours=ampere_hours)
logging.info('battery identified:\n{0}'.format(str(specs)))
return specs
def identify_batteries(modbus):
# type: (Modbus) -> list[Battery]
def _identify_batteries():
address_range = range(1, cfg.MAX_SLAVE_ADDRESS + 1)
for slave_address in address_range:
try:
yield identify_battery(modbus, slave_address)
except Exception as e:
logging.info('failed to identify battery at {0} : {1}'.format(str(slave_address), str(e)))
return list(_identify_batteries()) # force that lazy iterable!
def parse_slave_id(modbus, slave_address):
# type: (Modbus, int) -> (str, str, int)
slave_id = report_slave_id(modbus, slave_address)
sid = re.sub(b'[^\x20-\x7E]', b'', slave_id) # remove weird special chars
match = re.match('(?P<hw>48TL(?P<ah>\d+)) *(?P<bms>.*)', sid.decode('ascii'))
if match is None:
raise Exception('no known battery found')
return match.group('hw'), match.group('bms'), int(match.group('ah'))
def read_firmware_version(modbus, slave_address):
# type: (Modbus, int) -> str
logging.debug('reading firmware version')
try:
modbus.connect()
response = read_modbus_registers(modbus, slave_address, base_address=1054, count=1)
register = response.registers[0]
return '{0:0>4X}'.format(register)
finally:
modbus.close() # close in any case
def read_modbus_registers(modbus, slave_address, base_address=cfg.BASE_ADDRESS, count=cfg.NO_OF_REGISTERS):
# type: (Modbus, int) -> ReadInputRegistersResponse
logging.debug('requesting modbus registers {0}-{1}'.format(base_address, base_address + count))
return modbus.read_input_registers(
address=base_address,
count=count,
unit=slave_address)
def read_battery_status(modbus, battery):
# type: (Modbus, Battery) -> BatteryStatus
"""
Read the modbus registers containing the battery's status info.
"""
logging.debug('reading battery status')
try:
modbus.connect()
data = read_modbus_registers(modbus, battery.slave_address)
return BatteryStatus(battery, data.registers)
finally:
modbus.close() # close in any case
def publish_values(dbus, signals, statuses):
# type: (DBus, Iterable[Signal], Iterable[BatteryStatus]) -> ()
for s in signals:
values = [s.get_value(status) for status in statuses]
with dbus as srv:
srv[s.dbus_path] = s.aggregate(values)
previous_warnings = {}
previous_alarms = {}
class MessageType:
ALARM_OR_WARNING = "AlarmOrWarning"
HEARTBEAT = "Heartbeat"
class AlarmOrWarning:
def __init__(self, description, created_by):
self.date = datetime.now().strftime('%Y-%m-%d')
self.time = datetime.now().strftime('%H:%M:%S')
self.description = description
self.created_by = created_by
def to_dict(self):
return {
"Date": self.date,
"Time": self.time,
"Description": self.description,
"CreatedBy": self.created_by
}
def SubscribeToQueue():
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host="10.2.0.11",
port=5672,
virtual_host="/",
credentials=pika.PlainCredentials("producer", "b187ceaddb54d5485063ddc1d41af66f")))
channel = connection.channel()
channel.queue_declare(queue="statusQueue", durable=True)
print("Subscribed to queue")
except Exception as ex:
print("An error occurred while connecting to the RabbitMQ queue:", ex)
return channel
is_first_update = True
first_subscribe = False
prev_status=0
subscribed_to_queue_first_time=False
channel = SubscribeToQueue()
heartbit_interval = 0
# Create an S3config instance
s3_config = S3config()
INSTALLATION_ID=int(s3_config.bucket.split('-')[0])
PRODUCT_ID = 1
def update_state_from_dictionaries(current_warnings, current_alarms):
global previous_warnings, previous_alarms, INSTALLATION_ID, PRODUCT_ID, is_first_update, first_subscribe, channel,prev_status,heartbit_interval,subscribed_to_queue_first_time
heartbit_interval+=1
if is_first_update:
changed_warnings = current_warnings
changed_alarms = current_alarms
is_first_update = False
else:
changed_alarms={}
changed_warnings={}
# calculate the diff in warnings and alarms
prev_alarm_value_list=list(previous_alarms.values())
alarm_keys=list(previous_alarms.keys())
for i, alarm in enumerate(current_alarms.values()):
if alarm!=prev_alarm_value_list[i]:
changed_alarms[alarm_keys[i]]=True
else:
changed_alarms[alarm_keys[i]]=False
prev_warning_value_list=list(previous_warnings.values())
warning_keys=list(previous_warnings.keys())
for i, warning in enumerate(current_warnings.values()):
if warning!=prev_warning_value_list[i]:
changed_warnings[warning_keys[i]]=True
else:
changed_warnings[warning_keys[i]]=False
status_message = {
"InstallationId": INSTALLATION_ID,
"Product": PRODUCT_ID,
"Status": 0,
"Type": 1,
"Warnings": [],
"Alarms": []
}
# Evaluate alarms
if any(changed_alarms.values()):
for i, changed_alarm in enumerate(changed_alarms.values()):
if changed_alarm and list(current_alarms.values())[i]:
status_message["Alarms"].append(AlarmOrWarning(list(current_alarms.keys())[i],"System").to_dict())
if any(changed_warnings.values()):
for i, changed_warning in enumerate(changed_warnings.values()):
if changed_warning and list(current_warnings.values())[i]:
status_message["Warnings"].append(AlarmOrWarning(list(current_warnings.keys())[i],"System").to_dict())
if any(current_alarms.values()):
status_message["Status"]=2
if not any(current_alarms.values()) and any(current_warnings.values()):
status_message["Status"]=1
if not any(current_alarms.values()) and not any(current_warnings.values()):
status_message["Status"]=0
if status_message["Status"]!=prev_status or len(status_message["Warnings"])>0 or len(status_message["Alarms"])>0:
prev_status=status_message["Status"]
status_message["Type"]=0
status_message = json.dumps(status_message)
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
print(status_message)
print("Message sent successfully")
elif heartbit_interval>=15 or not subscribed_to_queue_first_time:
print("Send heartbit message to rabbitmq")
heartbit_interval=0
subscribed_to_queue_first_time=True
status_message = json.dumps(status_message)
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
previous_warnings = current_warnings.copy()
previous_alarms = current_alarms.copy()
return status_message
def read_warning_and_alarm_flags():
return [
# Warnings
CsvSignal('/Battery/Devices/WarningFlags/TaM1', c.read_bool(register=1005, bit=1)),
CsvSignal('/Battery/Devices/WarningFlags/TbM1', c.read_bool(register=1005, bit=4)),
CsvSignal('/Battery/Devices/WarningFlags/VBm1', c.read_bool(register=1005, bit=6)),
CsvSignal('/Battery/Devices/WarningFlags/VBM1', c.read_bool(register=1005, bit=8)),
CsvSignal('/Battery/Devices/WarningFlags/IDM1', c.read_bool(register=1005, bit=10)),
CsvSignal('/Battery/Devices/WarningFlags/vsm1', c.read_bool(register=1005, bit=22)),
CsvSignal('/Battery/Devices/WarningFlags/vsM1', c.read_bool(register=1005, bit=24)),
CsvSignal('/Battery/Devices/WarningFlags/iCM1', c.read_bool(register=1005, bit=26)),
CsvSignal('/Battery/Devices/WarningFlags/iDM1', c.read_bool(register=1005, bit=28)),
CsvSignal('/Battery/Devices/WarningFlags/MID1', c.read_bool(register=1005, bit=30)),
CsvSignal('/Battery/Devices/WarningFlags/BLPW', c.read_bool(register=1005, bit=32)),
CsvSignal('/Battery/Devices/WarningFlags/CCBF', c.read_bool(register=1005, bit=33)),
CsvSignal('/Battery/Devices/WarningFlags/Ah_W', c.read_bool(register=1005, bit=35)),
CsvSignal('/Battery/Devices/WarningFlags/MPMM', c.read_bool(register=1005, bit=38)),
CsvSignal('/Battery/Devices/WarningFlags/TCdi', c.read_bool(register=1005, bit=40)),
CsvSignal('/Battery/Devices/WarningFlags/LMPW', c.read_bool(register=1005, bit=44)),
CsvSignal('/Battery/Devices/WarningFlags/TOCW', c.read_bool(register=1005, bit=47)),
CsvSignal('/Battery/Devices/WarningFlags/BUSL', c.read_bool(register=1005, bit=49)),
], [
# Alarms
CsvSignal('/Battery/Devices/AlarmFlags/Tam', c.read_bool(register=1005, bit=0)),
CsvSignal('/Battery/Devices/AlarmFlags/TaM2', c.read_bool(register=1005, bit=2)),
CsvSignal('/Battery/Devices/AlarmFlags/Tbm', c.read_bool(register=1005, bit=3)),
CsvSignal('/Battery/Devices/AlarmFlags/TbM2', c.read_bool(register=1005, bit=5)),
CsvSignal('/Battery/Devices/AlarmFlags/VBm2', c.read_bool(register=1005, bit=7)),
CsvSignal('/Battery/Devices/AlarmFlags/VBM2', c.read_bool(register=1005, bit=9)),
CsvSignal('/Battery/Devices/AlarmFlags/IDM2', c.read_bool(register=1005, bit=11)),
CsvSignal('/Battery/Devices/AlarmFlags/ISOB', c.read_bool(register=1005, bit=12)),
CsvSignal('/Battery/Devices/AlarmFlags/MSWE', c.read_bool(register=1005, bit=13)),
CsvSignal('/Battery/Devices/AlarmFlags/FUSE', c.read_bool(register=1005, bit=14)),
CsvSignal('/Battery/Devices/AlarmFlags/HTRE', c.read_bool(register=1005, bit=15)),
CsvSignal('/Battery/Devices/AlarmFlags/TCPE', c.read_bool(register=1005, bit=16)),
CsvSignal('/Battery/Devices/AlarmFlags/STRE', c.read_bool(register=1005, bit=17)),
CsvSignal('/Battery/Devices/AlarmFlags/CME', c.read_bool(register=1005, bit=18)),
CsvSignal('/Battery/Devices/AlarmFlags/HWFL', c.read_bool(register=1005, bit=19)),
CsvSignal('/Battery/Devices/AlarmFlags/HWEM', c.read_bool(register=1005, bit=20)),
CsvSignal('/Battery/Devices/AlarmFlags/ThM', c.read_bool(register=1005, bit=21)),
CsvSignal('/Battery/Devices/AlarmFlags/vsm2', c.read_bool(register=1005, bit=23)),
CsvSignal('/Battery/Devices/AlarmFlags/vsM2', c.read_bool(register=1005, bit=25)),
CsvSignal('/Battery/Devices/AlarmFlags/iCM2', c.read_bool(register=1005, bit=27)),
CsvSignal('/Battery/Devices/AlarmFlags/iDM2', c.read_bool(register=1005, bit=29)),
CsvSignal('/Battery/Devices/AlarmFlags/MID2', c.read_bool(register=1005, bit=31)),
CsvSignal('/Battery/Devices/AlarmFlags/HTFS', c.read_bool(register=1005, bit=42)),
CsvSignal('/Battery/Devices/AlarmFlags/DATA', c.read_bool(register=1005, bit=43)),
CsvSignal('/Battery/Devices/AlarmFlags/LMPA', c.read_bool(register=1005, bit=45)),
CsvSignal('/Battery/Devices/AlarmFlags/HEBT', c.read_bool(register=1005, bit=46)),
CsvSignal('/Battery/Devices/AlarmFlags/CURM', c.read_bool(register=1005, bit=48)),
]
import random
def update_for_testing(modbus, batteries, dbus, signals, csv_signals):
global ALLOW
logging.debug('starting testing update cycle')
warning_signals, alarm_signals = read_warning_and_alarm_flags()
current_warnings = {}
current_alarms = {}
statuses = [read_battery_status(modbus, battery) for battery in batteries]
node_numbers = [battery.slave_address for battery in batteries]
if ALLOW:
any_warning_active = False
any_alarm_active = False
for i, node in enumerate(node_numbers):
for s in warning_signals:
signal_name = insert_id(s.name, i+1)
value = s.get_value(statuses[i])
current_warnings[signal_name] = value
if ALLOW and value:
any_warning_active = True
for s in alarm_signals:
signal_name = insert_id(s.name, i+1)
value = random.choice([True, False])
current_alarms[signal_name] = value
if ALLOW and value:
any_alarm_active = True
print(update_state_from_dictionaries(current_warnings, current_alarms))
publish_values(dbus, signals, statuses)
create_csv_files(csv_signals, statuses, node_numbers)
logging.debug('finished update cycle\n')
return True
2024-06-07 08:36:15 +00:00
start_time = time.time()
2024-05-28 14:25:22 +00:00
def update(modbus, batteries, dbus, signals, csv_signals):
2024-06-07 08:36:15 +00:00
global start_time
2024-05-28 14:25:22 +00:00
# type: (Modbus, Iterable[Battery], DBus, Iterable[Signal]) -> bool
"""
Main update function
1. requests status record each battery via modbus,
2. parses the data using Signal.get_value
3. aggregates the data from all batteries into one datum using Signal.aggregate
4. publishes the data on the dbus
"""
logging.debug('starting update cycle')
warnings_signals, alarm_signals = read_warning_and_alarm_flags()
current_warnings = {}
current_alarms= {}
statuses = [read_battery_status(modbus, battery) for battery in batteries]
node_numbers = [battery.slave_address for battery in batteries]
# Iterate over each node and signal to create rows in the new format
for i, node in enumerate(node_numbers):
for s in warnings_signals:
signal_name = insert_id(s.name, i+1)
value = s.get_value(statuses[i])
current_warnings[signal_name] = value
for s in alarm_signals:
signal_name = insert_id(s.name, i+1)
value = s.get_value(statuses[i])
current_alarms[signal_name] = value
print(update_state_from_dictionaries(current_warnings, current_alarms))
publish_values(dbus, signals, statuses)
2024-06-07 08:36:15 +00:00
elapsed_time = time.time() - start_time
if elapsed_time >= 30:
create_csv_files(csv_signals, statuses, node_numbers)
start_time = time.time()
print("Elapsed time {:.2f} seconds".format(elapsed_time))
2024-05-28 14:25:22 +00:00
logging.debug('finished update cycle\n')
return True
def print_usage():
print('Usage: ' + __file__ + ' <serial device>')
print('Example: ' + __file__ + ' ttyUSB0')
def parse_cmdline_args(argv):
# type: (list[str]) -> str
if len(argv) == 0:
logging.info('missing command line argument for tty device')
print_usage()
sys.exit(1)
return argv[0]
alive = True # global alive flag, watchdog_task clears it, update_task sets it
ALLOW = False
def create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop):
# type: (Modbus, DBus, Iterable[Battery], Iterable[Signal], DBusGMainLoop) -> Callable[[],bool]
"""
Creates an update task which runs the main update function
and resets the alive flag
"""
def update_task():
# type: () -> bool
global alive, ALLOW
if ALLOW:
ALLOW = False
else:
ALLOW = True
alive = update(modbus, batteries, dbus, signals, csv_signals)
#alive = update_for_testing(modbus, batteries, dbus, signals, csv_signals)
if not alive:
logging.info('update_task: quitting main loop because of error')
main_loop.quit()
return alive
return update_task
def create_watchdog_task(main_loop):
# type: (DBusGMainLoop) -> Callable[[],bool]
"""
Creates a Watchdog task that monitors the alive flag.
The watchdog kills the main loop if the alive flag is not periodically reset by the update task.
Who watches the watchdog?
"""
def watchdog_task():
# type: () -> bool
global alive
if alive:
logging.debug('watchdog_task: update_task is alive')
alive = False
return True
else:
logging.info('watchdog_task: killing main loop because update_task is no longer alive')
main_loop.quit()
return False
return watchdog_task
def get_installation_name(file_path):
with open(file_path, 'r') as file:
return file.read().strip()
def manage_csv_files(directory_path, max_files=20):
2024-06-07 08:36:15 +00:00
csv_files = [f for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
2024-05-28 14:25:22 +00:00
csv_files.sort(key=lambda x: os.path.getctime(os.path.join(directory_path, x)))
# Remove oldest files if exceeds maximum
while len(csv_files) > max_files:
file_to_delete = os.path.join(directory_path, csv_files.pop(0))
os.remove(file_to_delete)
def serialize_for_csv(value):
if isinstance(value, (dict, list, tuple)):
return json.dumps(value, ensure_ascii=False)
return str(value)
def insert_id(path, id_number):
parts = path.split("/")
insert_position = parts.index("Devices") + 1
parts.insert(insert_position, str(id_number))
return "/".join(parts)
def create_csv_files(signals, statuses, node_numbers):
global s3_config
timestamp = int(time.time())
if timestamp % 2 != 0:
timestamp -= 1
# Create CSV directory if it doesn't exist
if not os.path.exists(CSV_DIR):
os.makedirs(CSV_DIR)
csv_filename = f"{timestamp}.csv"
csv_path = os.path.join(CSV_DIR, csv_filename)
# Append values to the CSV file
with open(csv_path, 'a', newline='') as csvfile:
csv_writer = csv.writer(csvfile, delimiter=';')
# Add a special row for the nodes configuration
nodes_config_path = "/Config/Devices/BatteryNodes"
nodes_list = ",".join(str(node) for node in node_numbers)
config_row = [nodes_config_path, nodes_list, ""]
csv_writer.writerow(config_row)
# Iterate over each node and signal to create rows in the new format
for i, node in enumerate(node_numbers):
for s in signals:
signal_name = insert_id(s.name, i+1)
value = s.get_value(statuses[i])
row_values = [signal_name, value, s.get_text]
csv_writer.writerow(row_values)
# Manage CSV files, keep a limited number of files
# Create the CSV as a string
csv_data = read_csv_as_string(csv_path)
if csv_data is None:
print(" error while reading csv as string")
return
# zip-comp additions
compressed_csv = compress_csv_data(csv_data)
compressed_filename = f"{timestamp}.csv"
response = s3_config.create_put_request(compressed_filename, compressed_csv)
if response.status_code == 200:
os.remove(csv_path)
print("Success")
else:
failed_dir = os.path.join(CSV_DIR, "failed")
if not os.path.exists(failed_dir):
os.makedirs(failed_dir)
failed_path = os.path.join(failed_dir, csv_filename)
os.rename(csv_path, failed_path)
print("Uploading failed")
manage_csv_files(failed_dir, 10)
manage_csv_files(CSV_DIR)
def main(argv):
# type: (list[str]) -> ()
logging.basicConfig(level=cfg.LOG_LEVEL)
logging.info('starting ' + __file__)
tty = parse_cmdline_args(argv)
modbus = init_modbus(tty)
batteries = identify_batteries(modbus)
n = len(batteries)
logging.info('found ' + str(n) + (' battery' if n == 1 else ' batteries'))
if n <= 0:
sys.exit(2)
bat = c.first(batteries) # report hw and fw version of first battery found
signals = init_signals(bat.hardware_version, bat.firmware_version, n)
csv_signals = create_csv_signals(bat.firmware_version)
main_loop = init_main_loop() # must run before init_dbus because gobject does some global magic
dbus = init_dbus(tty, signals)
update_task = create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop)
watchdog_task = create_watchdog_task(main_loop)
GLib.timeout_add(cfg.UPDATE_INTERVAL * 2, watchdog_task) # add watchdog first
GLib.timeout_add(cfg.UPDATE_INTERVAL, update_task) # call update once every update_interval
logging.info('starting GLib.MainLoop')
main_loop.run()
logging.info('GLib.MainLoop was shut down')
sys.exit(0xFF) # reaches this only on error
if __name__ == "__main__":
main(sys.argv[1:])