add battery count function to redetect in fewer-battery-detected case

This commit is contained in:
Yinyin Liu 2024-08-09 12:50:19 +02:00
parent 4b12ce11d5
commit 7c912fc8a0
2 changed files with 108 additions and 11 deletions

View File

@ -1100,17 +1100,63 @@ def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnin
row_values = [signal_name, value, s.get_text] row_values = [signal_name, value, s.get_text]
csv_writer.writerow(row_values) csv_writer.writerow(row_values)
BATTERY_COUNTS_FILE = '/data/battery_count.csv'
def load_battery_counts():
if os.path.exists(BATTERY_COUNTS_FILE):
with open(BATTERY_COUNTS_FILE, 'r') as f:
reader = csv.reader(f)
return [int(row[0]) for row in reader]
return []
def save_battery_counts(battery_counts):
with open(BATTERY_COUNTS_FILE, 'w', newline='') as f:
writer = csv.writer(f)
for count in battery_counts:
writer.writerow([count])
def main(argv): def main(argv):
# type: (list[str]) -> () # type: (list[str]) -> ()
logging.basicConfig(level=cfg.LOG_LEVEL) logging.basicConfig(level=cfg.LOG_LEVEL)
logging.info('starting ' + __file__) logging.info('starting ' + __file__)
tty = parse_cmdline_args(argv) tty = parse_cmdline_args(argv)
modbus = init_modbus(tty) battery_counts = load_battery_counts()
batteries = identify_batteries(modbus) max_retry_attempts = 3 # Stop retrying in case it's a real battery loss case
n = len(batteries) retry_attempts = 0
logging.info('found ' + str(n) + (' battery' if n == 1 else ' batteries'))
if n <= 0: while True:
sys.exit(2) modbus = init_modbus(tty)
batteries = identify_batteries(modbus)
n = len(batteries)
logging.info('found %d %s', n, "battery" if n == 1 else "batteries")
if n <= 0:
sys.exit(2) # Exit if no batteries are found
if not battery_counts or n > max(battery_counts):
# If it's the first detection or detect more batteries than ever before
logging.info("It's new or more batteries detected")
battery_counts.append(n)
retry_attempts = 0
save_battery_counts(battery_counts)
elif n < max(battery_counts):
retry_attempts += 1
logging.warning('Attempt %d/%d: Detected fewer batteries than previously detected.',
retry_attempts, max_retry_attempts)
# If max retry attempts are exceeded, continue with fewer batteries
if retry_attempts >= max_retry_attempts:
logging.warning('Max retry attempts reached. Continuing with fewer batteries.')
save_battery_counts(battery_counts)
break
continue
elif n == max(battery_counts):
logging.info('Detected the same number of batteries as before. No need to re-detect.')
break
signals = init_signals(n) signals = init_signals(n)
csv_signals = create_csv_signals() csv_signals = create_csv_signals()
main_loop = init_main_loop() # must run before init_dbus because gobject does some global magic main_loop = init_main_loop() # must run before init_dbus because gobject does some global magic

View File

@ -326,7 +326,7 @@ def identify_batteries(modbus):
def _identify_batteries(): def _identify_batteries():
slave_address = 0 slave_address = 0
n_missing = -255 n_missing = -7
while n_missing < 3: while n_missing < 3:
slave_address += 1 slave_address += 1
@ -728,6 +728,19 @@ def create_watchdog_task(main_loop):
return watchdog_task return watchdog_task
BATTERY_COUNTS_FILE = '/data/battery_count.csv'
def load_battery_counts():
if os.path.exists(BATTERY_COUNTS_FILE):
with open(BATTERY_COUNTS_FILE, 'r') as f:
reader = csv.reader(f)
return [int(row[0]) for row in reader]
return []
def save_battery_counts(battery_counts):
with open(BATTERY_COUNTS_FILE, 'w', newline='') as f:
writer = csv.writer(f)
for count in battery_counts:
writer.writerow([count])
def main(argv): def main(argv):
# type: (List[str]) -> () # type: (List[str]) -> ()
@ -735,13 +748,51 @@ def main(argv):
logging.basicConfig(level=cfg.LOG_LEVEL) logging.basicConfig(level=cfg.LOG_LEVEL)
logging.info('starting ' + __file__) logging.info('starting ' + __file__)
# tty = parse_cmdline_args(argv)
# modbus = init_modbus(tty)
# batteries = identify_batteries(modbus)
# if len(batteries) <= 0:
# sys.exit(2)
tty = parse_cmdline_args(argv) tty = parse_cmdline_args(argv)
modbus = init_modbus(tty) battery_counts = load_battery_counts()
max_retry_attempts = 3 # Stop retrying in case it's a real battery loss case
retry_attempts = 0
batteries = identify_batteries(modbus) while True:
modbus = init_modbus(tty)
batteries = identify_batteries(modbus)
n = len(batteries)
logging.info('found %d %s', n, "battery" if n == 1 else "batteries")
if len(batteries) <= 0: if n <= 0:
sys.exit(2) sys.exit(2) # Exit if no batteries are found
if not battery_counts or n > max(battery_counts):
# If it's the first detection or detect more batteries than ever before
logging.info("It's new or more batteries detected")
battery_counts.append(n)
retry_attempts = 0
save_battery_counts(battery_counts)
elif n < max(battery_counts):
retry_attempts += 1
logging.warning('Attempt %d/%d: Detected fewer batteries than previously detected.',
retry_attempts, max_retry_attempts)
# If max retry attempts are exceeded, continue with fewer batteries
if retry_attempts >= max_retry_attempts:
logging.warning('Max retry attempts reached. Continuing with fewer batteries.')
save_battery_counts(battery_counts)
break
continue
elif n == max(battery_counts):
logging.info('Detected the same number of batteries as before. No need to re-detect.')
break
service = DBusService(service_name=cfg.SERVICE_NAME_PREFIX + tty) service = DBusService(service_name=cfg.SERVICE_NAME_PREFIX + tty)