160 lines
3.6 KiB
Plaintext
160 lines
3.6 KiB
Plaintext
|
#!/usr/bin/python2 -u
|
||
|
# coding=utf-8
|
||
|
from os import system
|
||
|
from pymodbus.client.sync import ModbusSerialClient as Modbus
|
||
|
from pymodbus.exceptions import ModbusIOException
|
||
|
from sys import argv, exit
|
||
|
|
||
|
# trick the pycharm type-checker into thinking Callable is in scope, not used at runtime
|
||
|
# noinspection PyUnreachableCode
|
||
|
if False:
|
||
|
from typing import List, Optional
|
||
|
|
||
|
|
||
|
RESET_REGISTER = 0x2087
|
||
|
FIRMWARE_VERSION_REGISTER = 1054
|
||
|
SERIAL_STARTER_DIR = '/opt/victronenergy/serial-starter/'
|
||
|
|
||
|
|
||
|
class LockTTY(object):
|
||
|
|
||
|
def __init__(self, tty):
|
||
|
# type: (str) -> None
|
||
|
self.tty = tty
|
||
|
|
||
|
def __enter__(self):
|
||
|
system(SERIAL_STARTER_DIR + 'stop-tty.sh ' + self.tty)
|
||
|
return self
|
||
|
|
||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
|
system(SERIAL_STARTER_DIR + 'start-tty.sh ' + self.tty)
|
||
|
|
||
|
|
||
|
def init_modbus(tty):
|
||
|
# type: (str) -> Modbus
|
||
|
|
||
|
return Modbus(
|
||
|
port='/dev/' + tty,
|
||
|
method='rtu',
|
||
|
baudrate=115200,
|
||
|
stopbits=1,
|
||
|
bytesize=8,
|
||
|
timeout=1, # seconds
|
||
|
parity='O')
|
||
|
|
||
|
|
||
|
def reset_bms(modbus, slave_id):
|
||
|
# type: (Modbus, int) -> bool
|
||
|
|
||
|
print ('resetting BMS...')
|
||
|
|
||
|
result = modbus.write_registers(RESET_REGISTER, [1], unit=slave_id)
|
||
|
|
||
|
# expecting a ModbusIOException (timeout)
|
||
|
# BMS can no longer reply because it is already reset
|
||
|
return isinstance(result, ModbusIOException)
|
||
|
|
||
|
|
||
|
def identify_battery(modbus, slave_id):
|
||
|
# type: (Modbus, int) -> bool
|
||
|
|
||
|
target = 'battery #' + str(slave_id) + ' at ' + modbus.port
|
||
|
|
||
|
try:
|
||
|
|
||
|
print(('contacting ' + target + ' ...'))
|
||
|
|
||
|
response = modbus.read_input_registers(address=FIRMWARE_VERSION_REGISTER, count=1, unit=slave_id)
|
||
|
fw = '{0:0>4X}'.format(response.registers[0])
|
||
|
|
||
|
print(('found battery with firmware ' + fw))
|
||
|
|
||
|
return True
|
||
|
|
||
|
except:
|
||
|
print(('failed to communicate with ' + target + ' !'))
|
||
|
return False
|
||
|
|
||
|
|
||
|
def is_int(value):
|
||
|
# type: (str) -> bool
|
||
|
try:
|
||
|
_ = int(value)
|
||
|
return True
|
||
|
except ValueError:
|
||
|
return False
|
||
|
|
||
|
|
||
|
def print_usage():
|
||
|
print(('Usage: ' + __file__ + ' <slave id> <serial device>'))
|
||
|
print(('Example: ' + __file__ + ' 2 ttyUSB0'))
|
||
|
print ('')
|
||
|
print ('You can omit the "ttyUSB" prefix of the serial device:')
|
||
|
print((' ' + __file__ + ' 2 0'))
|
||
|
print ('')
|
||
|
print ('You can omit the serial device entirely when the "com.victronenergy.battery.<serial device>" service is running:')
|
||
|
print((' ' + __file__ + ' 2'))
|
||
|
print ('')
|
||
|
|
||
|
|
||
|
def get_tty_from_battery_service_name():
|
||
|
# type: () -> Optional[str]
|
||
|
|
||
|
import dbus
|
||
|
bus = dbus.SystemBus()
|
||
|
|
||
|
tty = (
|
||
|
name.split('.')[-1]
|
||
|
for name in bus.list_names()
|
||
|
if name.startswith('com.victronenergy.battery.')
|
||
|
)
|
||
|
|
||
|
return next(tty, None)
|
||
|
|
||
|
|
||
|
def parse_tty(tty):
|
||
|
# type: (Optional[str]) -> str
|
||
|
|
||
|
if tty is None:
|
||
|
return get_tty_from_battery_service_name()
|
||
|
|
||
|
if is_int(tty):
|
||
|
return 'ttyUSB' + argv[1]
|
||
|
else:
|
||
|
return tty
|
||
|
|
||
|
|
||
|
def parse_cmdline_args(argv):
|
||
|
# type: (List[str]) -> (str, int)
|
||
|
|
||
|
slave_id = element_at_or_none(argv, 0)
|
||
|
tty = parse_tty(element_at_or_none(argv, 1))
|
||
|
|
||
|
if slave_id is None or tty is None:
|
||
|
print_usage()
|
||
|
exit(2)
|
||
|
|
||
|
return tty, int(slave_id)
|
||
|
|
||
|
|
||
|
def element_at_or_none(lst, index):
|
||
|
return next(iter(lst[index:]), None)
|
||
|
|
||
|
|
||
|
def main(argv):
|
||
|
# type: (List[str]) -> ()
|
||
|
|
||
|
tty, slave_id = parse_cmdline_args(argv)
|
||
|
|
||
|
with LockTTY(tty), init_modbus(tty) as modbus:
|
||
|
|
||
|
if identify_battery(modbus, slave_id) and reset_bms(modbus, slave_id):
|
||
|
print('SUCCESS')
|
||
|
exit(0)
|
||
|
else:
|
||
|
print('FAILURE')
|
||
|
exit(1)
|
||
|
|
||
|
|
||
|
main(argv[1:])
|