add: send heartbit with timestamp for cerbo

This commit is contained in:
kostas 2024-06-27 13:41:52 +02:00
parent 9fe9101c02
commit 3b90a51a54
1 changed files with 14 additions and 13 deletions

View File

@ -689,20 +689,15 @@ def SubscribeToQueue():
return channel return channel
is_first_update = True is_first_update = True
first_subscribe = False
prev_status=0 prev_status=0
subscribed_to_queue_first_time=False
channel = SubscribeToQueue() channel = SubscribeToQueue()
heartbit_interval = 0
# Create an S3config instance # Create an S3config instance
s3_config = S3config() s3_config = S3config()
INSTALLATION_ID=int(s3_config.bucket.split('-')[0]) INSTALLATION_ID=int(s3_config.bucket.split('-')[0])
PRODUCT_ID = 1 PRODUCT_ID = 1
def update_state_from_dictionaries(current_warnings, current_alarms, node_numbers): def update_state_from_dictionaries(current_warnings, current_alarms, node_numbers):
global previous_warnings, previous_alarms, INSTALLATION_ID, PRODUCT_ID, is_first_update, first_subscribe, channel,prev_status,heartbit_interval,subscribed_to_queue_first_time global previous_warnings, previous_alarms, INSTALLATION_ID, PRODUCT_ID, is_first_update, channel,prev_status
heartbit_interval+=1
if is_first_update: if is_first_update:
changed_warnings = current_warnings changed_warnings = current_warnings
@ -789,12 +784,6 @@ def update_state_from_dictionaries(current_warnings, current_alarms, node_number
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message) channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
print(status_message) print(status_message)
print("Message sent successfully") print("Message sent successfully")
elif heartbit_interval>=15 or not subscribed_to_queue_first_time:
print("Send heartbit message to rabbitmq")
heartbit_interval=0
subscribed_to_queue_first_time=True
status_message = json.dumps(status_message)
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
previous_warnings = current_warnings.copy() previous_warnings = current_warnings.copy()
previous_alarms = current_alarms.copy() previous_alarms = current_alarms.copy()
@ -989,6 +978,18 @@ def create_batch_of_csv_files():
if response.status_code == 200: if response.status_code == 200:
os.remove(first_csv_file) os.remove(first_csv_file)
print("Successfully uploaded the compresseed batch of files in s3") print("Successfully uploaded the compresseed batch of files in s3")
status_message = {
"InstallationId": INSTALLATION_ID,
"Product": PRODUCT_ID,
"Status": 0,
"Type": 1,
"Warnings": [],
"Alarms": [],
"Timestamp": numeric_part
}
status_message = json.dumps(status_message)
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
print("Successfully sent the heartbit with timestamp")
else: else:
# we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging # we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging
failed_dir = os.path.join(CSV_DIR, "failed") failed_dir = os.path.join(CSV_DIR, "failed")
@ -1020,7 +1021,7 @@ def create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop)
ALLOW = True ALLOW = True
alive = update(modbus, batteries, dbus, signals, csv_signals) alive = update(modbus, batteries, dbus, signals, csv_signals)
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
if count_files_in_folder(CSV_DIR) >= 15 and elapsed_time >= 30: if count_files_in_folder(CSV_DIR) >= 15 and elapsed_time >= 60:
create_batch_of_csv_files() create_batch_of_csv_files()
start_time = time.time() start_time = time.time()
#alive = update_for_testing(modbus, batteries, dbus, signals, csv_signals) #alive = update_for_testing(modbus, batteries, dbus, signals, csv_signals)