add: batch files

This commit is contained in:
kostas 2024-06-26 16:20:51 +02:00
parent 0aea1b5c56
commit e3e9817f2b
1 changed files with 117 additions and 42 deletions

View File

@ -31,6 +31,7 @@ from datetime import datetime
import io import io
import json import json
from convert import first from convert import first
import shutil
CSV_DIR = "/data/csv_files/" CSV_DIR = "/data/csv_files/"
INSTALLATION_NAME_FILE = '/data/innovenergy/openvpn/installation-name' INSTALLATION_NAME_FILE = '/data/innovenergy/openvpn/installation-name'
@ -249,10 +250,6 @@ def update_state_from_dictionaries(current_warnings, current_alarms, node_number
previous_warnings = current_warnings.copy() previous_warnings = current_warnings.copy()
previous_alarms = current_alarms.copy() previous_alarms = current_alarms.copy()
#print("ALARMS LIST")
#print(alarms_number_list)
#print("WARNINGS LIST")
#print(warnings_number_list)
return status_message, alarms_number_list, warnings_number_list return status_message, alarms_number_list, warnings_number_list
@ -504,29 +501,38 @@ def reset_batteries(modbus, batteries):
alive = True # global alive flag, watchdog_task clears it, update_task sets it alive = True # global alive flag, watchdog_task clears it, update_task sets it
start_time = time.time() start_time = time.time()
def count_files_in_folder(folder_path):
try:
# List all files in the folder
files = os.listdir(folder_path)
# Filter out directories, only count files
num_files = sum(1 for f in files if os.path.isfile(os.path.join(folder_path, f)))
return num_files
except FileNotFoundError:
return "Folder not found"
except Exception as e:
return str(e)
def create_update_task(modbus, service, batteries): def create_update_task(modbus, service, batteries):
global start_time
# type: (Modbus, DBusService, Iterable[Battery]) -> Callable[[],bool] # type: (Modbus, DBusService, Iterable[Battery]) -> Callable[[],bool]
""" """
Creates an update task which runs the main update function Creates an update task which runs the main update function
and resets the alive flag and resets the alive flag
""" """
global start_time
_socket = init_udp_socket() _socket = init_udp_socket()
_signals = signals.init_battery_signals() _signals = signals.init_battery_signals()
csv_signals = signals.create_csv_signals(first(batteries).firmware_version) csv_signals = signals.create_csv_signals(first(batteries).firmware_version)
node_numbers = [battery.slave_address for battery in batteries] node_numbers = [battery.slave_address for battery in batteries]
print("NODE NUMBERSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS")
warnings_signals, alarm_signals = signals.read_warning_and_alarm_flags() warnings_signals, alarm_signals = signals.read_warning_and_alarm_flags()
current_warnings = {} current_warnings = {}
current_alarms = {} current_alarms = {}
def update_task(): def update_task():
global start_time
# type: () -> bool # type: () -> bool
global alive, start_time
global alive
logging.debug('starting update cycle') logging.debug('starting update cycle')
@ -546,15 +552,19 @@ def create_update_task(modbus, service, batteries):
value = s.get_value(statuses[i]) value = s.get_value(statuses[i])
current_alarms[signal_name] = value current_alarms[signal_name] = value
status_message, alarms_number_list, warnings_number_list = update_state_from_dictionaries(current_warnings, current_alarms, node_numbers) status_message, alarms_number_list, warnings_number_list = update_state_from_dictionaries(current_warnings,
current_alarms,
node_numbers)
publish_values_on_dbus(service, _signals, statuses) publish_values_on_dbus(service, _signals, statuses)
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
if elapsed_time >= 30:
create_csv_files(csv_signals, statuses, node_numbers, alarms_number_list, warnings_number_list) create_csv_files(csv_signals, statuses, node_numbers, alarms_number_list, warnings_number_list)
num_files_in_csv_dir = count_files_in_folder(CSV_DIR)
if num_files_in_csv_dir >= 15 and elapsed_time >= 30:
create_batch_of_csv_files()
start_time = time.time() start_time = time.time()
print("Elapsed time {:.2f} seconds".format(elapsed_time))
upload_status_to_innovenergy(_socket, statuses) upload_status_to_innovenergy(_socket, statuses)
@ -579,6 +589,97 @@ def insert_id(path, id_number):
parts.insert(insert_position, str(id_number)) parts.insert(insert_position, str(id_number))
return "/".join(parts) return "/".join(parts)
def create_batch_of_csv_files():
# list all files in the directory
files = os.listdir(CSV_DIR)
# filter out only csv files
csv_files = [file for file in files if file.endswith('.csv')]
# sort csv files by creation time
csv_files.sort(key=lambda x: os.path.getctime(os.path.join(CSV_DIR, x)))
# keep the 15 MOST RECENT FILES
recent_csv_files = csv_files[-15:] if len(csv_files) > 15 else csv_files
# get the name of the first csv file
if not csv_files:
print("No csv files found in the directory.")
exit(0)
first_csv_file = os.path.join(CSV_DIR, recent_csv_files.pop(0))
first_csv_filename = os.path.basename(first_csv_file)
temp_file_path = os.path.join(CSV_DIR, 'temp_batch_file.csv')
# create a temporary file and write the timestamp and the original content of the first file
with open(temp_file_path, 'wb') as temp_file:
# Write the timestamp (filename) at the beginning
temp_file.write('Timestamp;{}\n'.format(first_csv_filename.split('.')[0]))
# write the original content of the first csv file
with open(first_csv_file, 'rb') as f:
temp_file.write(f.read())
for csv_file in recent_csv_files:
file_path = os.path.join(CSV_DIR, csv_file)
# write an empty line
temp_file.write('\n')
# write the timestamp (filename)
temp_file.write('Timestamp;{}\n'.format(csv_file.split('.')[0]))
# write the content of the file
with open(file_path, 'rb') as f:
temp_file.write(f.read())
# replace the original first csv file with the temporary file
os.remove(first_csv_file)
os.rename(temp_file_path, first_csv_file)
# create a loggin directory that contains at max 20 batch files for logging info
logging_dir = os.path.join(CSV_DIR, 'logging_batch_files')
if not os.path.exists(logging_dir):
os.makedirs(logging_dir)
shutil.copy(first_csv_file, logging_dir)
manage_csv_files(logging_dir)
# keep at most 100 files at CSV_DIR for logging
manage_csv_files(CSV_DIR, 100)
# print("The batch csv file is: {}".format(recent_csv_files[-1]))
# prepare for compression
csv_data = read_csv_as_string(first_csv_file)
if csv_data is None:
print("error while reading csv as string")
return
# zip-comp additions
compressed_csv = compress_csv_data(csv_data)
# Use the name of the last (most recent) CSV file in sorted csv_files as the name for the compressed file
last_csv_file_name = os.path.basename(recent_csv_files[-1]) if recent_csv_files else first_csv_filename
# we send the csv files every 30 seconds and the timestamp is adjusted to be a multiple of 30
numeric_part = int(last_csv_file_name.split('.')[0])
print("REAL NUMERIC PART: ", numeric_part)
# compressed_filename = "{}.csv".format(new_numeric_part)
compressed_filename = "{}.csv".format(numeric_part)
response = s3_config.create_put_request(compressed_filename, compressed_csv)
# response = s3_config.create_put_request(first_csv_filename, csv_data)
if response.status_code == 200:
os.remove(first_csv_file)
print("Successfully uploaded the compresseed batch of files in s3")
else:
# we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging
failed_dir = os.path.join(CSV_DIR, "failed")
if not os.path.exists(failed_dir):
os.makedirs(failed_dir)
failed_path = os.path.join(failed_dir, first_csv_filename)
os.rename(first_csv_file, failed_path)
print("Uploading failed")
manage_csv_files(failed_dir, 100)
def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnings_number_list): def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnings_number_list):
timestamp = int(time.time()) timestamp = int(time.time())
if timestamp % 2 != 0: if timestamp % 2 != 0:
@ -603,32 +704,6 @@ def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnin
row_values = [signal_name, value, s.get_text] row_values = [signal_name, value, s.get_text]
csv_writer.writerow(row_values) csv_writer.writerow(row_values)
csv_data = read_csv_as_string(csv_path)
if csv_data is None:
print("error while reading csv as string")
return
# zip-comp additions
compressed_csv = compress_csv_data(csv_data)
compressed_filename = "{}.csv".format(timestamp)
response = s3_config.create_put_request(compressed_filename, compressed_csv)
if response.status_code == 200:
#os.remove(csv_path)
print("Success")
else:
failed_dir = os.path.join(CSV_DIR, "failed")
if not os.path.exists(failed_dir):
os.makedirs(failed_dir)
failed_path = os.path.join(failed_dir, csv_filename)
os.rename(csv_path, failed_path)
print("Uploading failed")
manage_csv_files(failed_dir, 10)
manage_csv_files(CSV_DIR)
def create_watchdog_task(main_loop): def create_watchdog_task(main_loop):
# type: (DBusGMainLoop) -> Callable[[],bool] # type: (DBusGMainLoop) -> Callable[[],bool]
""" """