From e3e9817f2bc9c400f82575100dbb0a810aee51e4 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 26 Jun 2024 16:20:51 +0200 Subject: [PATCH] add: batch files --- .../dbus-fzsonick-48tl/dbus-fzsonick-48tl.py | 159 +++++++++++++----- 1 file changed, 117 insertions(+), 42 deletions(-) diff --git a/firmware/Venus_Release/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py b/firmware/Venus_Release/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py index 5786021c3..a47ecbb39 100755 --- a/firmware/Venus_Release/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py +++ b/firmware/Venus_Release/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py @@ -31,6 +31,7 @@ from datetime import datetime import io import json from convert import first +import shutil CSV_DIR = "/data/csv_files/" INSTALLATION_NAME_FILE = '/data/innovenergy/openvpn/installation-name' @@ -249,10 +250,6 @@ def update_state_from_dictionaries(current_warnings, current_alarms, node_number previous_warnings = current_warnings.copy() previous_alarms = current_alarms.copy() - #print("ALARMS LIST") - #print(alarms_number_list) - #print("WARNINGS LIST") - #print(warnings_number_list) return status_message, alarms_number_list, warnings_number_list @@ -504,29 +501,38 @@ def reset_batteries(modbus, batteries): alive = True # global alive flag, watchdog_task clears it, update_task sets it start_time = time.time() +def count_files_in_folder(folder_path): + try: + # List all files in the folder + files = os.listdir(folder_path) + # Filter out directories, only count files + num_files = sum(1 for f in files if os.path.isfile(os.path.join(folder_path, f))) + return num_files + except FileNotFoundError: + return "Folder not found" + except Exception as e: + return str(e) + def create_update_task(modbus, service, batteries): - global start_time # type: (Modbus, DBusService, Iterable[Battery]) -> Callable[[],bool] """ Creates an update task which runs the main update function and resets the alive flag """ + global start_time _socket = init_udp_socket() _signals = signals.init_battery_signals() - + csv_signals = signals.create_csv_signals(first(batteries).firmware_version) node_numbers = [battery.slave_address for battery in batteries] - print("NODE NUMBERSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS") warnings_signals, alarm_signals = signals.read_warning_and_alarm_flags() current_warnings = {} current_alarms = {} def update_task(): - global start_time # type: () -> bool - - global alive + global alive, start_time logging.debug('starting update cycle') @@ -545,16 +551,20 @@ def create_update_task(modbus, service, batteries): signal_name = insert_id(s.name, node) value = s.get_value(statuses[i]) current_alarms[signal_name] = value - - status_message, alarms_number_list, warnings_number_list = update_state_from_dictionaries(current_warnings, current_alarms, node_numbers) + + status_message, alarms_number_list, warnings_number_list = update_state_from_dictionaries(current_warnings, + current_alarms, + node_numbers) publish_values_on_dbus(service, _signals, statuses) - + elapsed_time = time.time() - start_time - if elapsed_time >= 30: - create_csv_files(csv_signals, statuses, node_numbers, alarms_number_list, warnings_number_list) + create_csv_files(csv_signals, statuses, node_numbers, alarms_number_list, warnings_number_list) + + num_files_in_csv_dir = count_files_in_folder(CSV_DIR) + if num_files_in_csv_dir >= 15 and elapsed_time >= 30: + create_batch_of_csv_files() start_time = time.time() - print("Elapsed time {:.2f} seconds".format(elapsed_time)) upload_status_to_innovenergy(_socket, statuses) @@ -579,6 +589,97 @@ def insert_id(path, id_number): parts.insert(insert_position, str(id_number)) return "/".join(parts) +def create_batch_of_csv_files(): + # list all files in the directory + files = os.listdir(CSV_DIR) + + # filter out only csv files + csv_files = [file for file in files if file.endswith('.csv')] + + # sort csv files by creation time + csv_files.sort(key=lambda x: os.path.getctime(os.path.join(CSV_DIR, x))) + + # keep the 15 MOST RECENT FILES + recent_csv_files = csv_files[-15:] if len(csv_files) > 15 else csv_files + + # get the name of the first csv file + if not csv_files: + print("No csv files found in the directory.") + exit(0) + + first_csv_file = os.path.join(CSV_DIR, recent_csv_files.pop(0)) + first_csv_filename = os.path.basename(first_csv_file) + + temp_file_path = os.path.join(CSV_DIR, 'temp_batch_file.csv') + + # create a temporary file and write the timestamp and the original content of the first file + with open(temp_file_path, 'wb') as temp_file: + # Write the timestamp (filename) at the beginning + temp_file.write('Timestamp;{}\n'.format(first_csv_filename.split('.')[0])) + # write the original content of the first csv file + with open(first_csv_file, 'rb') as f: + temp_file.write(f.read()) + for csv_file in recent_csv_files: + file_path = os.path.join(CSV_DIR, csv_file) + # write an empty line + temp_file.write('\n') + # write the timestamp (filename) + temp_file.write('Timestamp;{}\n'.format(csv_file.split('.')[0])) + # write the content of the file + with open(file_path, 'rb') as f: + temp_file.write(f.read()) + + # replace the original first csv file with the temporary file + os.remove(first_csv_file) + os.rename(temp_file_path, first_csv_file) + + # create a loggin directory that contains at max 20 batch files for logging info + logging_dir = os.path.join(CSV_DIR, 'logging_batch_files') + if not os.path.exists(logging_dir): + os.makedirs(logging_dir) + + shutil.copy(first_csv_file, logging_dir) + manage_csv_files(logging_dir) + + # keep at most 100 files at CSV_DIR for logging + manage_csv_files(CSV_DIR, 100) + + # print("The batch csv file is: {}".format(recent_csv_files[-1])) + + # prepare for compression + csv_data = read_csv_as_string(first_csv_file) + + if csv_data is None: + print("error while reading csv as string") + return + + # zip-comp additions + compressed_csv = compress_csv_data(csv_data) + # Use the name of the last (most recent) CSV file in sorted csv_files as the name for the compressed file + last_csv_file_name = os.path.basename(recent_csv_files[-1]) if recent_csv_files else first_csv_filename + + # we send the csv files every 30 seconds and the timestamp is adjusted to be a multiple of 30 + numeric_part = int(last_csv_file_name.split('.')[0]) + print("REAL NUMERIC PART: ", numeric_part) + + # compressed_filename = "{}.csv".format(new_numeric_part) + compressed_filename = "{}.csv".format(numeric_part) + + response = s3_config.create_put_request(compressed_filename, compressed_csv) + # response = s3_config.create_put_request(first_csv_filename, csv_data) + if response.status_code == 200: + os.remove(first_csv_file) + print("Successfully uploaded the compresseed batch of files in s3") + else: + # we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging + failed_dir = os.path.join(CSV_DIR, "failed") + if not os.path.exists(failed_dir): + os.makedirs(failed_dir) + failed_path = os.path.join(failed_dir, first_csv_filename) + os.rename(first_csv_file, failed_path) + print("Uploading failed") + manage_csv_files(failed_dir, 100) + def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnings_number_list): timestamp = int(time.time()) if timestamp % 2 != 0: @@ -603,32 +704,6 @@ def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnin row_values = [signal_name, value, s.get_text] csv_writer.writerow(row_values) - csv_data = read_csv_as_string(csv_path) - - if csv_data is None: - print("error while reading csv as string") - return - - # zip-comp additions - compressed_csv = compress_csv_data(csv_data) - compressed_filename = "{}.csv".format(timestamp) - - response = s3_config.create_put_request(compressed_filename, compressed_csv) - if response.status_code == 200: - #os.remove(csv_path) - print("Success") - else: - failed_dir = os.path.join(CSV_DIR, "failed") - if not os.path.exists(failed_dir): - os.makedirs(failed_dir) - failed_path = os.path.join(failed_dir, csv_filename) - os.rename(csv_path, failed_path) - print("Uploading failed") - manage_csv_files(failed_dir, 10) - - manage_csv_files(CSV_DIR) - - def create_watchdog_task(main_loop): # type: (DBusGMainLoop) -> Callable[[],bool] """