From 8b0b49fa3025562a10622e55659a188e7f22ecc4 Mon Sep 17 00:00:00 2001 From: Noe Date: Tue, 25 Feb 2025 08:57:36 +0100 Subject: [PATCH] Aggregator file supports json --- .../dbus-fzsonick-48tl/aggregator.py | 150 +++++++++++++----- .../dbus-fzsonick-48tl/aggregator.py | 146 ++++++++++++----- firmware/update_all_installations.sh | 31 ++++ 3 files changed, 241 insertions(+), 86 deletions(-) diff --git a/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py index ed1a9f3bc..1e6c2511d 100755 --- a/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py +++ b/firmware/Cerbo_Release/CerboReleaseFiles/dbus-fzsonick-48tl/aggregator.py @@ -13,10 +13,11 @@ import hmac import hashlib from threading import Thread, Event import config as cfg +import json -CSV_DIR = "/data/csv_files/" -HOURLY_DIR = "/data/csv_files/HourlyData" -DAILY_DIR = "/data/csv_files/DailyData" +JSON_DIR = "/data/json_files/" +HOURLY_DIR = "/data/json_files/HourlyData" +DAILY_DIR = "/data/json_files/DailyData" # S3 Credentials print("Start with the correct credentials") @@ -38,23 +39,24 @@ class AggregatedData: self.charging_battery_power = charging_battery_power self.heating_power = heating_power - def to_csv(self): - return ("/MinSoc;{};\n" - "/MaxSoc;{};\n" - "/DischargingBatteryPower;{};\n" - "/ChargingBatteryPower;{};\n" - "/HeatingPower;{};").format( - self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power) + def to_json(self): + return json.dumps({ + "MinSoc": self.min_soc, + "MaxSoc": self.max_soc, + "DischargingBatteryPower": self.discharging_battery_power, + "ChargingBatteryPower": self.charging_battery_power, + "HeatingPower": self.heating_power + }, separators=(',', ':')) def save(self, directory): timestamp = int(time.time()) if not os.path.exists(directory): os.makedirs(directory) - csv_path = os.path.join(directory, "{}.csv".format(timestamp)) - with open(csv_path, 'w') as file: - file.write(self.to_csv()) - print("Saved file to:", csv_path) - print("File content:\n", self.to_csv()) + json_path = os.path.join(directory, "{}.json".format(timestamp)) + with open(json_path, 'w') as file: + file.write(self.to_json()) + print("Saved file to:", json_path) + print("File content:\n", self.to_json()) @staticmethod def delete_data(directory): @@ -67,16 +69,16 @@ class AggregatedData: print("Deleted file: {}".format(file_path)) def push_to_s3(self, s3_config): - csv_data = self.to_csv() - compressed_csv = self.compress_csv_data(csv_data) + json_data = self.to_json() + compressed_json = self.compress_json_data(json_data) now = datetime.now() if now.hour == 0 and now.minute < 30: adjusted_date = now - timedelta(days=1) else: adjusted_date = now - s3_path = adjusted_date.strftime("%Y-%m-%d") + ".csv" - response = s3_config.create_put_request(s3_path, compressed_csv) + s3_path = adjusted_date.strftime("%Y-%m-%d") + ".json" + response = s3_config.create_put_request(s3_path, compressed_json) if response.status_code != 200: print("ERROR: PUT", response.text) return False @@ -84,10 +86,10 @@ class AggregatedData: return True @staticmethod - def compress_csv_data(csv_data): + def compress_json_data(json_data): memory_stream = io.BytesIO() with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: - archive.writestr("data.csv", csv_data.encode('utf-8')) + archive.writestr("data.json", json_data.encode('utf-8')) compressed_bytes = memory_stream.getvalue() return base64.b64encode(compressed_bytes).decode('utf-8') @@ -150,7 +152,7 @@ class Aggregator: current_time = datetime.now() after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1)) before_timestamp = datetime_to_timestamp(current_time) - aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp) + aggregated_data = Aggregator.create_hourly_data(JSON_DIR, after_timestamp, before_timestamp) print("Saving in hourly directory") aggregated_data.save(HOURLY_DIR) except Exception as e: @@ -195,31 +197,55 @@ class Aggregator: @staticmethod def create_hourly_data(directory, after_timestamp, before_timestamp): node_data = {} + print("INSIDE HOURLY MANAGER") for filename in os.listdir(directory): file_path = os.path.join(directory, filename) if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp): with open(file_path, 'r') as file: - reader = csv.reader(file, delimiter=';') - for row in reader: - if len(row) >= 2: - variable_name, value = row[0], row[1] - try: - value = float(value) - node_number = Aggregator.extract_node_number(variable_name) - if node_number not in node_data: - node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} - if "Soc" in variable_name: - node_data[node_number]['soc'].append(value) - elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: - if value < 0: - node_data[node_number]['discharge'].append(value) - else: - node_data[node_number]['charge'].append(value) - elif "/HeatingPower" in variable_name: - node_data[node_number]['heating'].append(value) - except ValueError: - pass + + data = json.load(file) + devices = data.get("Battery", {}).get("Devices", {}) + + for node_number, device_data in devices.items(): + + if node_number not in node_data: + node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} + + value = device_data.get("Soc", {}).get("value", "N/A") + node_data[node_number]['soc'].append(float(value)) + + value = device_data.get("Dc", {}).get("Power", "N/A").get("value", "N/A") + value = float(value) + if value < 0: + node_data[node_number]['discharge'].append(value) + else: + node_data[node_number]['charge'].append(value) + value = device_data.get("HeatingPower", "N/A").get("value", "N/A") + value = float(value) + node_data[node_number]['heating'].append(value) + + + # reader = csv.reader(file, delimiter=';') + # for row in reader: + # if len(row) >= 2: + # variable_name, value = row[0], row[1] + # try: + # value = float(value) + # node_number = Aggregator.extract_node_number(variable_name) + # if node_number not in node_data: + # node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} + # if "Soc" in variable_name: + # node_data[node_number]['soc'].append(value) + # elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: + # if value < 0: + # node_data[node_number]['discharge'].append(value) + # else: + # node_data[node_number]['charge'].append(value) + # elif "/HeatingPower" in variable_name: + # node_data[node_number]['heating'].append(value) + # except ValueError: + # pass if len(node_data) == 0: # No data collected, return default AggregatedData with zeros @@ -249,7 +275,45 @@ class Aggregator: @staticmethod def create_daily_data(directory, after_timestamp, before_timestamp): - return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp) + + node_data = {'MinSoc': [], 'MaxSoc': [], 'ChargingBatteryPower': [], 'DischargingBatteryPower': [], + 'HeatingPower': []} + + for filename in os.listdir(directory): + file_path = os.path.join(directory, filename) + if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, + before_timestamp): + with open(file_path, 'r') as file: + data = json.load(file) + + value = data.get("MinSoc", "N/A") + node_data['MinSoc'].append(float(value)) + + value = data.get("MaxSoc", "N/A") + node_data['MaxSoc'].append(float(value)) + + value = data.get("ChargingBatteryPower", "N/A") + node_data['ChargingBatteryPower'].append(float(value)) + + value = data.get("DischargingBatteryPower", "N/A") + node_data['DischargingBatteryPower'].append(float(value)) + + value = data.get("HeatingPower", "N/A") + node_data['HeatingPower'].append(float(value)) + + print(node_data) + + min_soc = min(node_data['MinSoc']) if node_data else 0.0 + max_soc = max(node_data['MaxSoc']) if node_data else 0.0 + total_discharging_power = sum(node_data['DischargingBatteryPower']) if node_data else 0.0 + total_charging_power = sum(node_data['ChargingBatteryPower']) if node_data else 0.0 + total_heating_power = sum(node_data['HeatingPower']) if node_data else 0.0 + + avg_discharging_power = total_discharging_power / len(node_data['DischargingBatteryPower']) + avg_charging_power = total_charging_power / len(node_data['ChargingBatteryPower']) + avg_heating_power = total_heating_power / len(node_data['HeatingPower']) + + return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) @staticmethod def is_file_within_time_range(filename, start_time, end_time): diff --git a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py index 0f787b8ce..548437fb2 100755 --- a/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py +++ b/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py @@ -13,10 +13,11 @@ import hmac import hashlib from threading import Thread, Event import config as cfg +import json -CSV_DIR = "/data/csv_files/" -HOURLY_DIR = "/data/csv_files/HourlyData" -DAILY_DIR = "/data/csv_files/DailyData" +JSON_DIR = "/data/json_files/" +HOURLY_DIR = "/data/json_files/HourlyData" +DAILY_DIR = "/data/json_files/DailyData" print("start with the correct credentials") @@ -37,23 +38,24 @@ class AggregatedData: self.charging_battery_power = charging_battery_power self.heating_power = heating_power - def to_csv(self): - return ("/MinSoc;{};\n" - "/MaxSoc;{};\n" - "/DischargingBatteryPower;{};\n" - "/ChargingBatteryPower;{};\n" - "/HeatingPower;{};").format( - self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power) + def to_json(self): + return json.dumps({ + "MinSoc": self.min_soc, + "MaxSoc": self.max_soc, + "DischargingBatteryPower": self.discharging_battery_power, + "ChargingBatteryPower": self.charging_battery_power, + "HeatingPower": self.heating_power + }, separators=(',', ':')) def save(self, directory): timestamp = int(time.time()) if not os.path.exists(directory): os.makedirs(directory) - csv_path = os.path.join(directory, "{}.csv".format(timestamp)) - with open(csv_path, 'w') as file: - file.write(self.to_csv()) - print("Saved file to:", csv_path) - print("File content:\n", self.to_csv()) + json_path = os.path.join(directory, "{}.json".format(timestamp)) + with open(json_path, 'w') as file: + file.write(self.to_json()) + print("Saved file to:", json_path) + print("File content:\n", self.to_json()) @staticmethod def delete_data(directory): @@ -66,16 +68,16 @@ class AggregatedData: print("Deleted file: {}".format(file_path)) def push_to_s3(self, s3_config): - csv_data = self.to_csv() - compressed_csv = self.compress_csv_data(csv_data) + json_data = self.to_json() + compressed_json = self.compress_json_data(json_data) now = datetime.now() if now.hour == 0 and now.minute < 30: adjusted_date = now - timedelta(days=1) else: adjusted_date = now - s3_path = adjusted_date.strftime("%Y-%m-%d") + ".csv" - response = s3_config.create_put_request(s3_path, compressed_csv) + s3_path = adjusted_date.strftime("%Y-%m-%d") + ".json" + response = s3_config.create_put_request(s3_path, compressed_json) if response.status_code != 200: print("ERROR: PUT", response.text) return False @@ -83,10 +85,10 @@ class AggregatedData: return True @staticmethod - def compress_csv_data(csv_data): + def compress_json_data(json_data): memory_stream = io.BytesIO() with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: - archive.writestr("data.csv", csv_data.encode('utf-8')) + archive.writestr("data.json", json_data.encode('utf-8')) compressed_bytes = memory_stream.getvalue() return base64.b64encode(compressed_bytes).decode('utf-8') @@ -152,7 +154,7 @@ class Aggregator: current_time = datetime.now() after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1)) before_timestamp = datetime_to_timestamp(current_time) - aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp) + aggregated_data = Aggregator.create_hourly_data(JSON_DIR, after_timestamp, before_timestamp) print("save in hourly dir") aggregated_data.save(HOURLY_DIR) except Exception as e: @@ -205,26 +207,49 @@ class Aggregator: file_path = os.path.join(directory, filename) if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp): with open(file_path, 'r') as file: - reader = csv.reader(file, delimiter=';') - for row in reader: - if len(row) >= 2: - variable_name, value = row[0], row[1] - try: - value = float(value) - node_number = Aggregator.extract_node_number(variable_name) - if node_number not in node_data: - node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} - if "Soc" in variable_name: - node_data[node_number]['soc'].append(value) - elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: - if value < 0: - node_data[node_number]['discharge'].append(value) - else: - node_data[node_number]['charge'].append(value) - elif "/HeatingPower" in variable_name: - node_data[node_number]['heating'].append(value) - except ValueError: - pass + data = json.load(file) + devices = data.get("Battery", {}).get("Devices", {}) + + for node_number, device_data in devices.items(): + + if node_number not in node_data: + node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} + + value = device_data.get("Soc", {}).get("value", "N/A") + node_data[node_number]['soc'].append(float(value)) + + value = device_data.get("Dc", {}).get("Power", "N/A").get("value", "N/A") + value=float(value) + if value < 0: + node_data[node_number]['discharge'].append(value) + else: + node_data[node_number]['charge'].append(value) + value = device_data.get("HeatingPower", "N/A").get("value", "N/A") + value = float(value) + node_data[node_number]['heating'].append(value) + + + # + # reader = csv.reader(file, delimiter=';') + # for row in reader: + # if len(row) >= 2: + # variable_name, value = row[0], row[1] + # try: + # value = float(value) + # node_number = Aggregator.extract_node_number(variable_name) + # if node_number not in node_data: + # node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} + # if "Soc" in variable_name: + # node_data[node_number]['soc'].append(value) + # elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: + # if value < 0: + # node_data[node_number]['discharge'].append(value) + # else: + # node_data[node_number]['charge'].append(value) + # elif "/HeatingPower" in variable_name: + # node_data[node_number]['heating'].append(value) + # except ValueError: + # pass if len(node_data) == 0: # No data collected, return default AggregatedData with zeros @@ -254,7 +279,42 @@ class Aggregator: @staticmethod def create_daily_data(directory, after_timestamp, before_timestamp): - return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp) + node_data = {'MinSoc': [], 'MaxSoc': [], 'ChargingBatteryPower': [], 'DischargingBatteryPower': [], 'HeatingPower': []} + + for filename in os.listdir(directory): + file_path = os.path.join(directory, filename) + if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp,before_timestamp): + with open(file_path, 'r') as file: + data = json.load(file) + + value = data.get("MinSoc", "N/A") + node_data['MinSoc'].append(float(value)) + + value = data.get("MaxSoc", "N/A") + node_data['MaxSoc'].append(float(value)) + + value = data.get("ChargingBatteryPower", "N/A") + node_data['ChargingBatteryPower'].append(float(value)) + + value = data.get("DischargingBatteryPower", "N/A") + node_data['DischargingBatteryPower'].append(float(value)) + + value = data.get("HeatingPower", "N/A") + node_data['HeatingPower'].append(float(value)) + + print(node_data) + + min_soc = min (node_data['MinSoc']) if node_data else 0.0 + max_soc = max(node_data['MaxSoc']) if node_data else 0.0 + total_discharging_power = sum(node_data['DischargingBatteryPower']) if node_data else 0.0 + total_charging_power = sum(node_data['ChargingBatteryPower']) if node_data else 0.0 + total_heating_power = sum(node_data['HeatingPower']) if node_data else 0.0 + + avg_discharging_power = total_discharging_power / len(node_data['DischargingBatteryPower']) + avg_charging_power = total_charging_power / len(node_data['ChargingBatteryPower']) + avg_heating_power = total_heating_power / len(node_data['HeatingPower']) + + return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) @staticmethod def is_file_within_time_range(filename, start_time, end_time): diff --git a/firmware/update_all_installations.sh b/firmware/update_all_installations.sh index 8de55743c..1ea7c0a92 100755 --- a/firmware/update_all_installations.sh +++ b/firmware/update_all_installations.sh @@ -38,6 +38,16 @@ deploy() { else echo "Warning: Failed to stop battery service on $ip_address" fi + + echo "SSH connection successful: $ip_address" + + # Stop aggregator service if changing aggregator-related files + if ssh -o StrictHostKeyChecking=no "$username@$ip_address" "svc -d /service/aggregator"; then + echo "Stopped aggregator service on $ip_address" + else + echo "Warning: Failed to stop aggregator service on $ip_address" + fi + # Copy files if scp -o ConnectTimeout=10 "$release_file_path/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py" "root@$ip_address:/opt/victronenergy/dbus-fzsonick-48tl"; then @@ -51,6 +61,19 @@ deploy() { else echo "Warning: Failed to copy file to /data on $ip_address" fi + + # Copy files + if scp -o ConnectTimeout=10 "$release_file_path/dbus-fzsonick-48tl/aggregator.py" "root@$ip_address:/opt/victronenergy/dbus-fzsonick-48tl"; then + echo "Copied file to /opt on $ip_address" + else + echo "Warning: Failed to copy file to /opt on $ip_address" + fi + + if scp -o ConnectTimeout=10 "$release_file_path/dbus-fzsonick-48tl/aggregator.py" "root@$ip_address:/data/dbus-fzsonick-48tl"; then + echo "Copied file to /data on $ip_address" + else + echo "Warning: Failed to copy file to /data on $ip_address" + fi # Start battery service if ssh -o StrictHostKeyChecking=no "$username@$ip_address" "svc -u /service/dbus-fzsonick-48tl.*"; then @@ -58,6 +81,14 @@ deploy() { else echo "Warning: Failed to start battery service on $ip_address" fi + + + # Start aggregator service + if ssh -o StrictHostKeyChecking=no "$username@$ip_address" "svc -u /service/aggregator"; then + echo "Started aggregator service on $ip_address" + else + echo "Warning: Failed to start aggregator service on $ip_address" + fi echo "Deployment completed for $ip_address ($device_type)" done