Compare commits
4 Commits
5d3b3b4cb2
...
9fe9101c02
Author | SHA1 | Date |
---|---|---|
kostas | 9fe9101c02 | |
kostas | a3842555a8 | |
kostas | 61232e6ffb | |
kostas | fd8f380efe |
|
@ -148,13 +148,9 @@ INSTALLATION_ID=int(s3_config.bucket.split('-')[0])
|
|||
PRODUCT_ID = 1
|
||||
is_first_update = True
|
||||
prev_status = 0
|
||||
subscribed_to_queue_first_time = False
|
||||
heartbit_interval = 0
|
||||
|
||||
def update_state_from_dictionaries(current_warnings, current_alarms, node_numbers):
|
||||
global previous_warnings, previous_alarms, INSTALLATION_ID, PRODUCT_ID, is_first_update, channel, prev_status, heartbit_interval, subscribed_to_queue_first_time
|
||||
|
||||
heartbit_interval += 1
|
||||
global previous_warnings, previous_alarms, INSTALLATION_ID, PRODUCT_ID, is_first_update, channel, prev_status
|
||||
|
||||
if is_first_update:
|
||||
changed_warnings = current_warnings
|
||||
|
@ -241,12 +237,6 @@ def update_state_from_dictionaries(current_warnings, current_alarms, node_number
|
|||
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
|
||||
print(status_message)
|
||||
print("Message sent successfully")
|
||||
elif heartbit_interval>=15 or not subscribed_to_queue_first_time:
|
||||
print("Send heartbit message to rabbitmq")
|
||||
heartbit_interval=0
|
||||
subscribed_to_queue_first_time=True
|
||||
status_message = json.dumps(status_message)
|
||||
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
|
||||
|
||||
previous_warnings = current_warnings.copy()
|
||||
previous_alarms = current_alarms.copy()
|
||||
|
@ -562,7 +552,7 @@ def create_update_task(modbus, service, batteries):
|
|||
create_csv_files(csv_signals, statuses, node_numbers, alarms_number_list, warnings_number_list)
|
||||
|
||||
num_files_in_csv_dir = count_files_in_folder(CSV_DIR)
|
||||
if num_files_in_csv_dir >= 15 and elapsed_time >= 30:
|
||||
if num_files_in_csv_dir >= 15 and elapsed_time >= 60:
|
||||
create_batch_of_csv_files()
|
||||
start_time = time.time()
|
||||
|
||||
|
@ -660,7 +650,6 @@ def create_batch_of_csv_files():
|
|||
|
||||
# we send the csv files every 30 seconds and the timestamp is adjusted to be a multiple of 30
|
||||
numeric_part = int(last_csv_file_name.split('.')[0])
|
||||
print("REAL NUMERIC PART: ", numeric_part)
|
||||
|
||||
# compressed_filename = "{}.csv".format(new_numeric_part)
|
||||
compressed_filename = "{}.csv".format(numeric_part)
|
||||
|
@ -670,6 +659,18 @@ def create_batch_of_csv_files():
|
|||
if response.status_code == 200:
|
||||
os.remove(first_csv_file)
|
||||
print("Successfully uploaded the compresseed batch of files in s3")
|
||||
status_message = {
|
||||
"InstallationId": INSTALLATION_ID,
|
||||
"Product": PRODUCT_ID,
|
||||
"Status": 0,
|
||||
"Type": 1,
|
||||
"Warnings": [],
|
||||
"Alarms": [],
|
||||
"Timestamp": numeric_part
|
||||
}
|
||||
status_message = json.dumps(status_message)
|
||||
channel.basic_publish(exchange="", routing_key="statusQueue", body=status_message)
|
||||
print("Successfully sent the heartbit with timestamp")
|
||||
else:
|
||||
# we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging
|
||||
failed_dir = os.path.join(CSV_DIR, "failed")
|
||||
|
@ -766,3 +767,4 @@ def main(argv):
|
|||
|
||||
|
||||
main(sys.argv[1:])
|
||||
|
||||
|
|
Loading…
Reference in New Issue