Innovenergy_trunk/NodeRed/dbus-csv-files/dbus-csv-files.py

732 lines
25 KiB
Python
Executable File

#! /usr/bin/python3 -u
import re
import sys
import logging
from gi.repository import GLib
import config as cfg
import convert as c
from pymodbus.register_read_message import ReadInputRegistersResponse
from pymodbus.client.sync import ModbusSerialClient as Modbus
from pymodbus.other_message import ReportSlaveIdRequest
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ExceptionResponse
from dbus.mainloop.glib import DBusGMainLoop
from data import BatteryStatus, Battery, LedColor, CsvSignal, LedState
from collections import Iterable
from os import path
app_dir = path.dirname(path.realpath(__file__))
sys.path.insert(1, path.join(app_dir, 'ext', 'velib_python'))
#from vedbus import VeDbusService as DBus
import time
import os
import csv
import requests
import hmac
import hashlib
import base64
from datetime import datetime
import io
class S3config:
def __init__(self):
self.bucket = "1-c0436b6a-d276-4cd8-9c44-1eae86cf5d0e"
self.region = "sos-ch-dk-2"
self.provider = "exo.io"
self.key = "EXOcc0e47a4c4d492888ff5a7f2"
self.secret = "79QG4unMh7MeVacMnXr5xGxEyAlWZDIdM-dg_nXFFr4"
self.content_type = "text/plain; charset=utf-8"
@property
def host(self):
return f"{self.bucket}.{self.region}.{self.provider}"
@property
def url(self):
return f"https://{self.host}"
def create_put_request(self, s3_path, data):
headers = self._create_request("PUT", s3_path)
url = f"{self.url}/{s3_path}"
response = requests.put(url, headers=headers, data=data)
return response
def _create_request(self, method, s3_path):
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
auth = self._create_authorization(method, self.bucket, s3_path, date, self.key, self.secret, self.content_type)
headers = {
"Host": self.host,
"Date": date,
"Authorization": auth,
"Content-Type": self.content_type
}
return headers
@staticmethod
def _create_authorization(method, bucket, s3_path, date, s3_key, s3_secret, content_type="", md5_hash=""):
payload = f"{method}\n{md5_hash}\n{content_type}\n{date}\n/{bucket.strip('/')}/{s3_path.strip('/')}"
signature = base64.b64encode(
hmac.new(s3_secret.encode(), payload.encode(), hashlib.sha1).digest()
).decode()
return f"AWS {s3_key}:{signature}"
def read_csv_as_string(file_path):
"""
Reads a CSV file from the given path and returns its content as a single string.
"""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
print(f"Error: The file {file_path} does not exist.")
return None
except IOError as e:
print(f"IO error occurred: {str(e)}")
return None
CSV_DIR = "/data/csv_files_service/"
#CSV_DIR = "csv_files/"
# Define the path to the file containing the installation name
INSTALLATION_NAME_FILE = '/data/innovenergy/openvpn/installation-name'
# trick the pycharm type-checker into thinking Callable is in scope, not used at runtime
# noinspection PyUnreachableCode
if False:
from typing import Callable
def interpret_limb_bitmap(bitmap_value):
# The bit for string 1 also monitors all 5 strings: 0000 0000 means All 5 strings activated. 0000 0001 means string 1 disabled.
string1_disabled = int((bitmap_value & 0b00001) != 0)
string2_disabled = int((bitmap_value & 0b00010) != 0)
string3_disabled = int((bitmap_value & 0b00100) != 0)
string4_disabled = int((bitmap_value & 0b01000) != 0)
string5_disabled = int((bitmap_value & 0b10000) != 0)
n_limb_strings = string1_disabled+string2_disabled+string3_disabled+string4_disabled+string5_disabled
return n_limb_strings
def create_csv_signals(firmware_version):
def read_power(status):
return int(read_current(status) * read_voltage(status))
read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2)
read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2)
read_limb_bitmap = c.read_bitmap(1059)
def string1_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00001) != 0)
def string2_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00010) != 0)
def string3_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b00100) != 0)
def string4_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b01000) != 0)
def string5_disabled(status):
bitmap_value = read_limb_bitmap(status)
return int((bitmap_value & 0b10000) != 0)
def limp_strings_value(status):
return interpret_limb_bitmap(read_limb_bitmap(status))
def calc_power_limit_imposed_by_voltage_limit(v, i, v_limit, r_int):
# type: (float, float, float, float) -> float
dv = v_limit - v
di = dv / r_int
p_limit = v_limit * (i + di)
return p_limit
def calc_power_limit_imposed_by_current_limit(v, i, i_limit, r_int):
# type: (float, float, float, float) -> float
di = i_limit - i
dv = di * r_int
p_limit = i_limit * (v + dv)
return p_limit
def calc_max_charge_power(status):
# type: (BatteryStatus) -> int
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
i_max = n_strings * cfg.I_MAX_PER_STRING
v_max = cfg.V_MAX
r_int_min = cfg.R_STRING_MIN / n_strings
r_int_max = cfg.R_STRING_MAX / n_strings
v = read_voltage(status)
i = read_current(status)
p_limits = [
calc_power_limit_imposed_by_voltage_limit(v, i, v_max,r_int_min),
calc_power_limit_imposed_by_voltage_limit(v, i, v_max, r_int_max),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_min),
calc_power_limit_imposed_by_current_limit(v, i, i_max, r_int_max),
]
p_limit = min(p_limits) # p_limit is normally positive here (signed)
p_limit = max(p_limit, 0) # charge power must not become negative
return int(p_limit)
def calc_max_discharge_power(status):
n_strings = cfg.NUM_OF_STRING_PER_BATTERY-limp_strings_value(status)
max_discharge_current = n_strings*cfg.I_MAX_PER_STRING
return int(max_discharge_current*read_voltage(status))
def return_led_state_blue(status):
led_state = c.read_led_state(register=1004, led=LedColor.blue)(status)
if led_state == LedState.blinking_fast or led_state == LedState.blinking_slow:
return "Blinking"
elif led_state == LedState.on:
return "On"
elif led_state == LedState.off:
return "Off"
return "Unknown"
def return_led_state_red(status):
led_state = c.read_led_state(register=1004, led=LedColor.red)(status)
if led_state == LedState.blinking_fast or led_state == LedState.blinking_slow:
return "Blinking"
elif led_state == LedState.on:
return "On"
elif led_state == LedState.off:
return "Off"
return "Unknown"
def return_led_state_green(status):
led_state = c.read_led_state(register=1004, led=LedColor.green)(status)
if led_state == LedState.blinking_fast or led_state == LedState.blinking_slow:
return "Blinking"
elif led_state == LedState.on:
return "On"
elif led_state == LedState.off:
return "Off"
return "Unknown"
def return_led_state_amber(status):
led_state = c.read_led_state(register=1004, led=LedColor.amber)(status)
if led_state == LedState.blinking_fast or led_state == LedState.blinking_slow:
return "Blinking"
elif led_state == LedState.on:
return "On"
elif led_state == LedState.off:
return "Off"
return "Unknown"
total_current = c.read_float(register=1062, scale_factor=0.01, offset=-10000, places=1)
def read_total_current(status):
return total_current(status)
def read_heating_current(status):
return total_current(status) - read_current(status)
def read_heating_power(status):
return read_voltage(status) * read_heating_current(status)
soc_ah = c.read_float(register=1002, scale_factor=0.1, offset=-10000, places=1)
def read_soc_ah(status):
return soc_ah(status)
def hex_string_to_ascii(hex_string):
# Ensure the hex_string is correctly formatted without spaces
hex_string = hex_string.replace(" ", "")
# Convert every two characters (a byte) in the hex string to ASCII
ascii_string = ''.join([chr(int(hex_string[i:i+2], 16)) for i in range(0, len(hex_string), 2)])
return ascii_string
battery_status_reader = c.read_hex_string(1060,2)
def read_eoc_reached(status):
battery_status_string = battery_status_reader(status)
#if hex_string_to_ascii(battery_status_string) == "EOC_":
#return True
#return False
return hex_string_to_ascii(battery_status_string) == "EOC_"
def read_serial_number(status):
serial_regs = [1055, 1056, 1057, 1058]
serial_parts = []
for reg in serial_regs:
# reading each register as a single hex value
hex_value_fun = c.read_hex_string(reg, 1)
hex_value = hex_value_fun(status)
# append without spaces and leading zeros stripped if any
serial_parts.append(hex_value.replace(' ', ''))
# concatenate all parts to form the full serial number
serial_number = ''.join(serial_parts).rstrip('0')
return serial_number
return [
CsvSignal('/Battery/Devices/FwVersion', firmware_version),
CsvSignal('/Battery/Devices/Dc/Power', read_power, 'W'),
CsvSignal('/Battery/Devices/Dc/Voltage', read_voltage, 'V'),
CsvSignal('/Battery/Devices/Soc', c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), '%'),
CsvSignal('/Battery/Devices/Temperatures/Cells/Average', c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), 'C'),
CsvSignal('/Battery/Devices/Dc/Current', read_current, 'A'),
CsvSignal('/Battery/Devices/BusCurrent', read_total_current, 'A'),
CsvSignal('/Battery/Devices/CellsCurrent', read_current, 'A'),
CsvSignal('/Battery/Devices/HeatingCurrent', read_heating_current, 'A'),
CsvSignal('/Battery/Devices/HeatingPower', read_heating_power, 'W'),
CsvSignal('/Battery/Devices/SOCAh', read_soc_ah),
CsvSignal('/Battery/Devices/Leds/Blue', return_led_state_blue),
CsvSignal('/Battery/Devices/Leds/Red', return_led_state_red),
CsvSignal('/Battery/Devices/Leds/Green', return_led_state_green),
CsvSignal('/Battery/Devices/Leds/Amber', return_led_state_amber),
CsvSignal('/Battery/Devices/BatteryStrings/String1Active', string1_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String2Active', string2_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String3Active', string3_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String4Active', string4_disabled),
CsvSignal('/Battery/Devices/BatteryStrings/String5Active', string5_disabled),
CsvSignal('/Battery/Devices/IoStatus/ConnectedToDcBus', c.read_bool(register=1013, bit=0)),
CsvSignal('/Battery/Devices/IoStatus/AlarmOutActive', c.read_bool(register=1013, bit=1)),
CsvSignal('/Battery/Devices/IoStatus/InternalFanActive', c.read_bool(register=1013, bit=2)),
CsvSignal('/Battery/Devices/IoStatus/VoltMeasurementAllowed', c.read_bool(register=1013, bit=3)),
CsvSignal('/Battery/Devices/IoStatus/AuxRelayBus', c.read_bool(register=1013, bit=4)),
CsvSignal('/Battery/Devices/IoStatus/RemoteStateActive', c.read_bool(register=1013, bit=5)),
CsvSignal('/Battery/Devices/IoStatus/RiscActive', c.read_bool(register=1013, bit=6)),
CsvSignal('/Battery/Devices/Eoc', read_eoc_reached),
CsvSignal('/Battery/Devices/SerialNumber', read_serial_number),
CsvSignal('/Battery/Devices/TimeSinceTOC', c.read_float(register=1052)),
CsvSignal('/Battery/Devices/MaxChargePower', calc_max_charge_power),
CsvSignal('/Battery/Devices/MaxDischargePower', calc_max_discharge_power),
# Warnings
CsvSignal('/Battery/Devices/WarningFlags/TaM1', c.read_bool(register=1005, bit=1)),
CsvSignal('/Battery/Devices/WarningFlags/TbM1', c.read_bool(register=1005, bit=4)),
CsvSignal('/Battery/Devices/WarningFlags/VBm1', c.read_bool(register=1005, bit=6)),
CsvSignal('/Battery/Devices/WarningFlags/VBM1', c.read_bool(register=1005, bit=8)),
CsvSignal('/Battery/Devices/WarningFlags/IDM1', c.read_bool(register=1005, bit=10)),
CsvSignal('/Battery/Devices/WarningFlags/vsm1', c.read_bool(register=1005, bit=22)),
CsvSignal('/Battery/Devices/WarningFlags/vsM1', c.read_bool(register=1005, bit=24)),
CsvSignal('/Battery/Devices/WarningFlags/iCM1', c.read_bool(register=1005, bit=26)),
CsvSignal('/Battery/Devices/WarningFlags/iDM1', c.read_bool(register=1005, bit=28)),
CsvSignal('/Battery/Devices/WarningFlags/MID1', c.read_bool(register=1005, bit=30)),
CsvSignal('/Battery/Devices/WarningFlags/BLPW', c.read_bool(register=1005, bit=32)),
CsvSignal('/Battery/Devices/WarningFlags/CCBF', c.read_bool(register=1005, bit=33)),
CsvSignal('/Battery/Devices/WarningFlags/Ah_W', c.read_bool(register=1005, bit=35)),
CsvSignal('/Battery/Devices/WarningFlags/MPMM', c.read_bool(register=1005, bit=38)),
CsvSignal('/Battery/Devices/WarningFlags/TCdi', c.read_bool(register=1005, bit=40)),
CsvSignal('/Battery/Devices/WarningFlags/LMPW', c.read_bool(register=1005, bit=44)),
CsvSignal('/Battery/Devices/WarningFlags/TOCW', c.read_bool(register=1005, bit=47)),
CsvSignal('/Battery/Devices/WarningFlags/BUSL', c.read_bool(register=1005, bit=49)),
# Alarms
CsvSignal('/Battery/Devices/AlarmFlags/Tam', c.read_bool(register=1005, bit=0)),
CsvSignal('/Battery/Devices/AlarmFlags/TaM2', c.read_bool(register=1005, bit=2)),
CsvSignal('/Battery/Devices/AlarmFlags/Tbm', c.read_bool(register=1005, bit=3)),
CsvSignal('/Battery/Devices/AlarmFlags/TbM2', c.read_bool(register=1005, bit=5)),
CsvSignal('/Battery/Devices/AlarmFlags/VBm2', c.read_bool(register=1005, bit=7)),
CsvSignal('/Battery/Devices/AlarmFlags/VBM2', c.read_bool(register=1005, bit=9)),
CsvSignal('/Battery/Devices/AlarmFlags/IDM2', c.read_bool(register=1005, bit=11)),
CsvSignal('/Battery/Devices/AlarmFlags/ISOB', c.read_bool(register=1005, bit=12)),
CsvSignal('/Battery/Devices/AlarmFlags/MSWE', c.read_bool(register=1005, bit=13)),
CsvSignal('/Battery/Devices/AlarmFlags/FUSE', c.read_bool(register=1005, bit=14)),
CsvSignal('/Battery/Devices/AlarmFlags/HTRE', c.read_bool(register=1005, bit=15)),
CsvSignal('/Battery/Devices/AlarmFlags/TCPE', c.read_bool(register=1005, bit=16)),
CsvSignal('/Battery/Devices/AlarmFlags/STRE', c.read_bool(register=1005, bit=17)),
CsvSignal('/Battery/Devices/AlarmFlags/CME', c.read_bool(register=1005, bit=18)),
CsvSignal('/Battery/Devices/AlarmFlags/HWFL', c.read_bool(register=1005, bit=19)),
CsvSignal('/Battery/Devices/AlarmFlags/HWEM', c.read_bool(register=1005, bit=20)),
CsvSignal('/Battery/Devices/AlarmFlags/ThM', c.read_bool(register=1005, bit=21)),
CsvSignal('/Battery/Devices/AlarmFlags/vsm2', c.read_bool(register=1005, bit=23)),
CsvSignal('/Battery/Devices/AlarmFlags/vsM2', c.read_bool(register=1005, bit=25)),
CsvSignal('/Battery/Devices/AlarmFlags/iCM2', c.read_bool(register=1005, bit=27)),
CsvSignal('/Battery/Devices/AlarmFlags/iDM2', c.read_bool(register=1005, bit=29)),
CsvSignal('/Battery/Devices/AlarmFlags/MID2', c.read_bool(register=1005, bit=31)),
CsvSignal('/Battery/Devices/AlarmFlags/HTFS', c.read_bool(register=1005, bit=42)),
CsvSignal('/Battery/Devices/AlarmFlags/DATA', c.read_bool(register=1005, bit=43)),
CsvSignal('/Battery/Devices/AlarmFlags/LMPA', c.read_bool(register=1005, bit=45)),
CsvSignal('/Battery/Devices/AlarmFlags/HEBT', c.read_bool(register=1005, bit=46)),
CsvSignal('/Battery/Devices/AlarmFlags/CURM', c.read_bool(register=1005, bit=48)),
]
def init_modbus(tty):
# type: (str) -> Modbus
logging.debug('initializing Modbus')
return Modbus(
port='/dev/' + tty,
method=cfg.MODE,
baudrate=cfg.BAUD_RATE,
stopbits=cfg.STOP_BITS,
bytesize=cfg.BYTE_SIZE,
timeout=cfg.TIMEOUT,
parity=cfg.PARITY)
def read_modbus_registers(modbus, slave_address, base_address=cfg.BASE_ADDRESS, count=cfg.NO_OF_REGISTERS):
# type: (Modbus, int) -> ReadInputRegistersResponse
logging.debug('requesting modbus registers {0}-{1}'.format(base_address, base_address + count))
return modbus.read_input_registers(
address=base_address,
count=count,
unit=slave_address)
def read_firmware_version(modbus, slave_address):
# type: (Modbus, int) -> str
logging.debug('reading firmware version')
try:
modbus.connect()
response = read_modbus_registers(modbus, slave_address, base_address=1054, count=1)
register = response.registers[0]
return '{0:0>4X}'.format(register)
finally:
modbus.close() # close in any case
def init_main_loop():
# type: () -> DBusGMainLoop
logging.debug('initializing DBusGMainLoop Loop')
DBusGMainLoop(set_as_default=True)
return GLib.MainLoop()
def report_slave_id(modbus, slave_address):
# type: (Modbus, int) -> str
slave = str(slave_address)
logging.debug('requesting slave id from node ' + slave)
try:
modbus.connect()
request = ReportSlaveIdRequest(unit=slave_address)
response = modbus.execute(request)
if response is ExceptionResponse or issubclass(type(response), ModbusException):
raise Exception('failed to get slave id from ' + slave + ' : ' + str(response))
return response.identifier
finally:
modbus.close()
def parse_slave_id(modbus, slave_address):
# type: (Modbus, int) -> (str, str, int)
slave_id = report_slave_id(modbus, slave_address)
sid = re.sub(b'[^\x20-\x7E]', b'', slave_id) # remove weird special chars
match = re.match('(?P<hw>48TL(?P<ah>\d+)) *(?P<bms>.*)', sid.decode('ascii'))
if match is None:
raise Exception('no known battery found')
return match.group('hw'), match.group('bms'), int(match.group('ah'))
def identify_battery(modbus, slave_address):
# type: (Modbus, int) -> Battery
logging.info('identifying battery...')
hardware_version, bms_version, ampere_hours = parse_slave_id(modbus, slave_address)
firmware_version = read_firmware_version(modbus, slave_address)
specs = Battery(
slave_address=slave_address,
hardware_version=hardware_version,
firmware_version=firmware_version,
bms_version=bms_version,
ampere_hours=ampere_hours)
logging.info('battery identified:\n{0}'.format(str(specs)))
return specs
def identify_batteries(modbus):
# type: (Modbus) -> list[Battery]
def _identify_batteries():
address_range = range(1, cfg.MAX_SLAVE_ADDRESS + 1)
for slave_address in address_range:
try:
yield identify_battery(modbus, slave_address)
except Exception as e:
logging.info('failed to identify battery at {0} : {1}'.format(str(slave_address), str(e)))
return list(_identify_batteries()) # force that lazy iterable!
def read_modbus_registers(modbus, slave_address, base_address=cfg.BASE_ADDRESS, count=cfg.NO_OF_REGISTERS):
# type: (Modbus, int) -> ReadInputRegistersResponse
logging.debug('requesting modbus registers {0}-{1}'.format(base_address, base_address + count))
return modbus.read_input_registers(
address=base_address,
count=count,
unit=slave_address)
def read_battery_status(modbus, battery):
# type: (Modbus, Battery) -> BatteryStatus
"""
Read the modbus registers containing the battery's status info.
"""
logging.debug('reading battery status')
try:
modbus.connect()
data = read_modbus_registers(modbus, battery.slave_address)
return BatteryStatus(battery, data.registers)
finally:
modbus.close() # close in any case
def get_installation_name(file_path):
with open(file_path, 'r') as file:
return file.read().strip()
def manage_csv_files(directory_path, max_files=20):
csv_files = [f for f in os.listdir(directory_path)]
csv_files.sort(key=lambda x: os.path.getctime(os.path.join(directory_path, x)))
# Remove oldest files if exceeds maximum
while len(csv_files) > max_files:
file_to_delete = os.path.join(directory_path, csv_files.pop(0))
os.remove(file_to_delete)
def serialize_for_csv(value):
if isinstance(value, (dict, list, tuple)):
return json.dumps(value, ensure_ascii=False)
return str(value)
def insert_id(path, id_number):
parts = path.split("/")
insert_position = parts.index("Devices") + 1
parts.insert(insert_position, str(id_number))
return "/".join(parts)
def create_csv_files(signals, statuses, node_numbers):
timestamp = int(time.time())
if timestamp % 2 != 0:
timestamp -= 1
# Create CSV directory if it doesn't exist
if not os.path.exists(CSV_DIR):
os.makedirs(CSV_DIR)
#installation_name = get_installation_name(INSTALLATION_NAME_FILE)
csv_filename = f"{timestamp}.csv"
csv_path = os.path.join(CSV_DIR, csv_filename)
# Append values to the CSV file
with open(csv_path, 'a', newline='') as csvfile:
csv_writer = csv.writer(csvfile, delimiter=';')
# Add a special row for the nodes configuration
nodes_config_path = "/Config/Devices/BatteryNodes"
nodes_list = ",".join(str(node) for node in node_numbers)
config_row = [nodes_config_path, nodes_list, ""]
csv_writer.writerow(config_row)
# Iterate over each node and signal to create rows in the new format
for i, node in enumerate(node_numbers):
for s in signals:
signal_name = insert_id(s.name, i+1)
#value = serialize_for_csv(s.get_value(statuses[i]))
value = s.get_value(statuses[i])
row_values = [signal_name, value, s.get_text]
csv_writer.writerow(row_values)
# Manage CSV files, keep a limited number of files
# Create the CSV as a string
csv_data = read_csv_as_string(csv_path)
# Create an S3config instance
s3_config = S3config()
response = s3_config.create_put_request(csv_filename, csv_data)
if response.status_code == 200:
os.remove(csv_path)
print("Success")
else:
failed_dir = os.path.join(CSV_DIR, "failed")
if not os.path.exists(failed_dir):
os.makedirs(failed_dir)
failed_path = os.path.join(failed_dir, csv_filename)
os.rename(csv_path, failed_path)
print("Uploading failed")
manage_csv_files(failed_dir, 10)
manage_csv_files(CSV_DIR)
def update(modbus, batteries, csv_signals):
# type: (Modbus, Iterable[Battery], DBus, Iterable[Signal]) -> bool
"""
Main update function
1. requests status record each battery via modbus,
2. parses the data using Signal.get_value
3. aggregates the data from all batteries into one datum using Signal.aggregate
4. publishes the data on the dbus
"""
logging.debug('starting update cycle')
statuses = [read_battery_status(modbus, battery) for battery in batteries]
node_numbers = [battery.slave_address for battery in batteries]
create_csv_files(csv_signals, statuses, node_numbers)
logging.debug('finished update cycle\n')
return True
def print_usage():
print ('Usage: ' + __file__ + ' <serial device>')
print ('Example: ' + __file__ + ' ttyUSB0')
def parse_cmdline_args(argv):
# type: (list[str]) -> str
if len(argv) == 0:
logging.info('missing command line argument for tty device')
print_usage()
sys.exit(1)
return argv[0]
alive = True # global alive flag, watchdog_task clears it, update_task sets it
def create_update_task(modbus, batteries, csv_signals, main_loop):
# type: (Modbus, DBus, Iterable[Battery], Iterable[Signal], DBusGMainLoop) -> Callable[[],bool]
"""
Creates an update task which runs the main update function
and resets the alive flag
"""
def update_task():
# type: () -> bool
global alive
alive = update(modbus, batteries, csv_signals)
if not alive:
logging.info('update_task: quitting main loop because of error')
main_loop.quit()
return alive
return update_task
def create_watchdog_task(main_loop):
# type: (DBusGMainLoop) -> Callable[[],bool]
"""
Creates a Watchdog task that monitors the alive flag.
The watchdog kills the main loop if the alive flag is not periodically reset by the update task.
Who watches the watchdog?
"""
def watchdog_task():
# type: () -> bool
global alive
if alive:
logging.debug('watchdog_task: update_task is alive')
alive = False
return True
else:
logging.info('watchdog_task: killing main loop because update_task is no longer alive')
main_loop.quit()
return False
return watchdog_task
def main(argv):
# type: (list[str]) -> ()
print("PAME")
logging.basicConfig(level=cfg.LOG_LEVEL)
logging.info('starting ' + __file__)
tty = parse_cmdline_args(argv)
modbus = init_modbus(tty)
batteries = identify_batteries(modbus)
n = len(batteries)
logging.info('found ' + str(n) + (' battery' if n == 1 else ' batteries'))
if n <= 0:
sys.exit(2)
bat = c.first(batteries) # report hw and fw version of first battery found
csv_signals = create_csv_signals(bat.firmware_version)
main_loop = init_main_loop() # must run before init_dbus because gobject does some global magic
# we do not use dbus this time. we only want modbus
update_task = create_update_task(modbus, batteries, csv_signals, main_loop)
watchdog_task = create_watchdog_task(main_loop)
GLib.timeout_add(cfg.UPDATE_INTERVAL * 2, watchdog_task) # add watchdog first
GLib.timeout_add(cfg.UPDATE_INTERVAL, update_task) # call update once every update_interval
logging.info('starting GLib.MainLoop')
main_loop.run()
logging.info('GLib.MainLoop was shut down')
sys.exit(0xFF) # reaches this only on error
if __name__ == "__main__":
main(sys.argv[1:])