Aggregator file supports json

This commit is contained in:
Noe 2025-02-25 08:57:36 +01:00
parent 0592179e75
commit 8b0b49fa30
3 changed files with 241 additions and 86 deletions

View File

@ -13,10 +13,11 @@ import hmac
import hashlib import hashlib
from threading import Thread, Event from threading import Thread, Event
import config as cfg import config as cfg
import json
CSV_DIR = "/data/csv_files/" JSON_DIR = "/data/json_files/"
HOURLY_DIR = "/data/csv_files/HourlyData" HOURLY_DIR = "/data/json_files/HourlyData"
DAILY_DIR = "/data/csv_files/DailyData" DAILY_DIR = "/data/json_files/DailyData"
# S3 Credentials # S3 Credentials
print("Start with the correct credentials") print("Start with the correct credentials")
@ -38,23 +39,24 @@ class AggregatedData:
self.charging_battery_power = charging_battery_power self.charging_battery_power = charging_battery_power
self.heating_power = heating_power self.heating_power = heating_power
def to_csv(self): def to_json(self):
return ("/MinSoc;{};\n" return json.dumps({
"/MaxSoc;{};\n" "MinSoc": self.min_soc,
"/DischargingBatteryPower;{};\n" "MaxSoc": self.max_soc,
"/ChargingBatteryPower;{};\n" "DischargingBatteryPower": self.discharging_battery_power,
"/HeatingPower;{};").format( "ChargingBatteryPower": self.charging_battery_power,
self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power) "HeatingPower": self.heating_power
}, separators=(',', ':'))
def save(self, directory): def save(self, directory):
timestamp = int(time.time()) timestamp = int(time.time())
if not os.path.exists(directory): if not os.path.exists(directory):
os.makedirs(directory) os.makedirs(directory)
csv_path = os.path.join(directory, "{}.csv".format(timestamp)) json_path = os.path.join(directory, "{}.json".format(timestamp))
with open(csv_path, 'w') as file: with open(json_path, 'w') as file:
file.write(self.to_csv()) file.write(self.to_json())
print("Saved file to:", csv_path) print("Saved file to:", json_path)
print("File content:\n", self.to_csv()) print("File content:\n", self.to_json())
@staticmethod @staticmethod
def delete_data(directory): def delete_data(directory):
@ -67,16 +69,16 @@ class AggregatedData:
print("Deleted file: {}".format(file_path)) print("Deleted file: {}".format(file_path))
def push_to_s3(self, s3_config): def push_to_s3(self, s3_config):
csv_data = self.to_csv() json_data = self.to_json()
compressed_csv = self.compress_csv_data(csv_data) compressed_json = self.compress_json_data(json_data)
now = datetime.now() now = datetime.now()
if now.hour == 0 and now.minute < 30: if now.hour == 0 and now.minute < 30:
adjusted_date = now - timedelta(days=1) adjusted_date = now - timedelta(days=1)
else: else:
adjusted_date = now adjusted_date = now
s3_path = adjusted_date.strftime("%Y-%m-%d") + ".csv" s3_path = adjusted_date.strftime("%Y-%m-%d") + ".json"
response = s3_config.create_put_request(s3_path, compressed_csv) response = s3_config.create_put_request(s3_path, compressed_json)
if response.status_code != 200: if response.status_code != 200:
print("ERROR: PUT", response.text) print("ERROR: PUT", response.text)
return False return False
@ -84,10 +86,10 @@ class AggregatedData:
return True return True
@staticmethod @staticmethod
def compress_csv_data(csv_data): def compress_json_data(json_data):
memory_stream = io.BytesIO() memory_stream = io.BytesIO()
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
archive.writestr("data.csv", csv_data.encode('utf-8')) archive.writestr("data.json", json_data.encode('utf-8'))
compressed_bytes = memory_stream.getvalue() compressed_bytes = memory_stream.getvalue()
return base64.b64encode(compressed_bytes).decode('utf-8') return base64.b64encode(compressed_bytes).decode('utf-8')
@ -150,7 +152,7 @@ class Aggregator:
current_time = datetime.now() current_time = datetime.now()
after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1)) after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1))
before_timestamp = datetime_to_timestamp(current_time) before_timestamp = datetime_to_timestamp(current_time)
aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp) aggregated_data = Aggregator.create_hourly_data(JSON_DIR, after_timestamp, before_timestamp)
print("Saving in hourly directory") print("Saving in hourly directory")
aggregated_data.save(HOURLY_DIR) aggregated_data.save(HOURLY_DIR)
except Exception as e: except Exception as e:
@ -195,31 +197,55 @@ class Aggregator:
@staticmethod @staticmethod
def create_hourly_data(directory, after_timestamp, before_timestamp): def create_hourly_data(directory, after_timestamp, before_timestamp):
node_data = {} node_data = {}
print("INSIDE HOURLY MANAGER")
for filename in os.listdir(directory): for filename in os.listdir(directory):
file_path = os.path.join(directory, filename) file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp): if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp):
with open(file_path, 'r') as file: with open(file_path, 'r') as file:
reader = csv.reader(file, delimiter=';')
for row in reader: data = json.load(file)
if len(row) >= 2: devices = data.get("Battery", {}).get("Devices", {})
variable_name, value = row[0], row[1]
try: for node_number, device_data in devices.items():
value = float(value)
node_number = Aggregator.extract_node_number(variable_name) if node_number not in node_data:
if node_number not in node_data: node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
if "Soc" in variable_name: value = device_data.get("Soc", {}).get("value", "N/A")
node_data[node_number]['soc'].append(value) node_data[node_number]['soc'].append(float(value))
elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name:
if value < 0: value = device_data.get("Dc", {}).get("Power", "N/A").get("value", "N/A")
node_data[node_number]['discharge'].append(value) value = float(value)
else: if value < 0:
node_data[node_number]['charge'].append(value) node_data[node_number]['discharge'].append(value)
elif "/HeatingPower" in variable_name: else:
node_data[node_number]['heating'].append(value) node_data[node_number]['charge'].append(value)
except ValueError: value = device_data.get("HeatingPower", "N/A").get("value", "N/A")
pass value = float(value)
node_data[node_number]['heating'].append(value)
# reader = csv.reader(file, delimiter=';')
# for row in reader:
# if len(row) >= 2:
# variable_name, value = row[0], row[1]
# try:
# value = float(value)
# node_number = Aggregator.extract_node_number(variable_name)
# if node_number not in node_data:
# node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
# if "Soc" in variable_name:
# node_data[node_number]['soc'].append(value)
# elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name:
# if value < 0:
# node_data[node_number]['discharge'].append(value)
# else:
# node_data[node_number]['charge'].append(value)
# elif "/HeatingPower" in variable_name:
# node_data[node_number]['heating'].append(value)
# except ValueError:
# pass
if len(node_data) == 0: if len(node_data) == 0:
# No data collected, return default AggregatedData with zeros # No data collected, return default AggregatedData with zeros
@ -249,7 +275,45 @@ class Aggregator:
@staticmethod @staticmethod
def create_daily_data(directory, after_timestamp, before_timestamp): def create_daily_data(directory, after_timestamp, before_timestamp):
return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp)
node_data = {'MinSoc': [], 'MaxSoc': [], 'ChargingBatteryPower': [], 'DischargingBatteryPower': [],
'HeatingPower': []}
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp,
before_timestamp):
with open(file_path, 'r') as file:
data = json.load(file)
value = data.get("MinSoc", "N/A")
node_data['MinSoc'].append(float(value))
value = data.get("MaxSoc", "N/A")
node_data['MaxSoc'].append(float(value))
value = data.get("ChargingBatteryPower", "N/A")
node_data['ChargingBatteryPower'].append(float(value))
value = data.get("DischargingBatteryPower", "N/A")
node_data['DischargingBatteryPower'].append(float(value))
value = data.get("HeatingPower", "N/A")
node_data['HeatingPower'].append(float(value))
print(node_data)
min_soc = min(node_data['MinSoc']) if node_data else 0.0
max_soc = max(node_data['MaxSoc']) if node_data else 0.0
total_discharging_power = sum(node_data['DischargingBatteryPower']) if node_data else 0.0
total_charging_power = sum(node_data['ChargingBatteryPower']) if node_data else 0.0
total_heating_power = sum(node_data['HeatingPower']) if node_data else 0.0
avg_discharging_power = total_discharging_power / len(node_data['DischargingBatteryPower'])
avg_charging_power = total_charging_power / len(node_data['ChargingBatteryPower'])
avg_heating_power = total_heating_power / len(node_data['HeatingPower'])
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
@staticmethod @staticmethod
def is_file_within_time_range(filename, start_time, end_time): def is_file_within_time_range(filename, start_time, end_time):

View File

@ -13,10 +13,11 @@ import hmac
import hashlib import hashlib
from threading import Thread, Event from threading import Thread, Event
import config as cfg import config as cfg
import json
CSV_DIR = "/data/csv_files/" JSON_DIR = "/data/json_files/"
HOURLY_DIR = "/data/csv_files/HourlyData" HOURLY_DIR = "/data/json_files/HourlyData"
DAILY_DIR = "/data/csv_files/DailyData" DAILY_DIR = "/data/json_files/DailyData"
print("start with the correct credentials") print("start with the correct credentials")
@ -37,23 +38,24 @@ class AggregatedData:
self.charging_battery_power = charging_battery_power self.charging_battery_power = charging_battery_power
self.heating_power = heating_power self.heating_power = heating_power
def to_csv(self): def to_json(self):
return ("/MinSoc;{};\n" return json.dumps({
"/MaxSoc;{};\n" "MinSoc": self.min_soc,
"/DischargingBatteryPower;{};\n" "MaxSoc": self.max_soc,
"/ChargingBatteryPower;{};\n" "DischargingBatteryPower": self.discharging_battery_power,
"/HeatingPower;{};").format( "ChargingBatteryPower": self.charging_battery_power,
self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power) "HeatingPower": self.heating_power
}, separators=(',', ':'))
def save(self, directory): def save(self, directory):
timestamp = int(time.time()) timestamp = int(time.time())
if not os.path.exists(directory): if not os.path.exists(directory):
os.makedirs(directory) os.makedirs(directory)
csv_path = os.path.join(directory, "{}.csv".format(timestamp)) json_path = os.path.join(directory, "{}.json".format(timestamp))
with open(csv_path, 'w') as file: with open(json_path, 'w') as file:
file.write(self.to_csv()) file.write(self.to_json())
print("Saved file to:", csv_path) print("Saved file to:", json_path)
print("File content:\n", self.to_csv()) print("File content:\n", self.to_json())
@staticmethod @staticmethod
def delete_data(directory): def delete_data(directory):
@ -66,16 +68,16 @@ class AggregatedData:
print("Deleted file: {}".format(file_path)) print("Deleted file: {}".format(file_path))
def push_to_s3(self, s3_config): def push_to_s3(self, s3_config):
csv_data = self.to_csv() json_data = self.to_json()
compressed_csv = self.compress_csv_data(csv_data) compressed_json = self.compress_json_data(json_data)
now = datetime.now() now = datetime.now()
if now.hour == 0 and now.minute < 30: if now.hour == 0 and now.minute < 30:
adjusted_date = now - timedelta(days=1) adjusted_date = now - timedelta(days=1)
else: else:
adjusted_date = now adjusted_date = now
s3_path = adjusted_date.strftime("%Y-%m-%d") + ".csv" s3_path = adjusted_date.strftime("%Y-%m-%d") + ".json"
response = s3_config.create_put_request(s3_path, compressed_csv) response = s3_config.create_put_request(s3_path, compressed_json)
if response.status_code != 200: if response.status_code != 200:
print("ERROR: PUT", response.text) print("ERROR: PUT", response.text)
return False return False
@ -83,10 +85,10 @@ class AggregatedData:
return True return True
@staticmethod @staticmethod
def compress_csv_data(csv_data): def compress_json_data(json_data):
memory_stream = io.BytesIO() memory_stream = io.BytesIO()
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
archive.writestr("data.csv", csv_data.encode('utf-8')) archive.writestr("data.json", json_data.encode('utf-8'))
compressed_bytes = memory_stream.getvalue() compressed_bytes = memory_stream.getvalue()
return base64.b64encode(compressed_bytes).decode('utf-8') return base64.b64encode(compressed_bytes).decode('utf-8')
@ -152,7 +154,7 @@ class Aggregator:
current_time = datetime.now() current_time = datetime.now()
after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1)) after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1))
before_timestamp = datetime_to_timestamp(current_time) before_timestamp = datetime_to_timestamp(current_time)
aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp) aggregated_data = Aggregator.create_hourly_data(JSON_DIR, after_timestamp, before_timestamp)
print("save in hourly dir") print("save in hourly dir")
aggregated_data.save(HOURLY_DIR) aggregated_data.save(HOURLY_DIR)
except Exception as e: except Exception as e:
@ -205,26 +207,49 @@ class Aggregator:
file_path = os.path.join(directory, filename) file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp): if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp):
with open(file_path, 'r') as file: with open(file_path, 'r') as file:
reader = csv.reader(file, delimiter=';') data = json.load(file)
for row in reader: devices = data.get("Battery", {}).get("Devices", {})
if len(row) >= 2:
variable_name, value = row[0], row[1] for node_number, device_data in devices.items():
try:
value = float(value) if node_number not in node_data:
node_number = Aggregator.extract_node_number(variable_name) node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
if node_number not in node_data:
node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} value = device_data.get("Soc", {}).get("value", "N/A")
if "Soc" in variable_name: node_data[node_number]['soc'].append(float(value))
node_data[node_number]['soc'].append(value)
elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: value = device_data.get("Dc", {}).get("Power", "N/A").get("value", "N/A")
if value < 0: value=float(value)
node_data[node_number]['discharge'].append(value) if value < 0:
else: node_data[node_number]['discharge'].append(value)
node_data[node_number]['charge'].append(value) else:
elif "/HeatingPower" in variable_name: node_data[node_number]['charge'].append(value)
node_data[node_number]['heating'].append(value) value = device_data.get("HeatingPower", "N/A").get("value", "N/A")
except ValueError: value = float(value)
pass node_data[node_number]['heating'].append(value)
#
# reader = csv.reader(file, delimiter=';')
# for row in reader:
# if len(row) >= 2:
# variable_name, value = row[0], row[1]
# try:
# value = float(value)
# node_number = Aggregator.extract_node_number(variable_name)
# if node_number not in node_data:
# node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
# if "Soc" in variable_name:
# node_data[node_number]['soc'].append(value)
# elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name:
# if value < 0:
# node_data[node_number]['discharge'].append(value)
# else:
# node_data[node_number]['charge'].append(value)
# elif "/HeatingPower" in variable_name:
# node_data[node_number]['heating'].append(value)
# except ValueError:
# pass
if len(node_data) == 0: if len(node_data) == 0:
# No data collected, return default AggregatedData with zeros # No data collected, return default AggregatedData with zeros
@ -254,7 +279,42 @@ class Aggregator:
@staticmethod @staticmethod
def create_daily_data(directory, after_timestamp, before_timestamp): def create_daily_data(directory, after_timestamp, before_timestamp):
return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp) node_data = {'MinSoc': [], 'MaxSoc': [], 'ChargingBatteryPower': [], 'DischargingBatteryPower': [], 'HeatingPower': []}
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp,before_timestamp):
with open(file_path, 'r') as file:
data = json.load(file)
value = data.get("MinSoc", "N/A")
node_data['MinSoc'].append(float(value))
value = data.get("MaxSoc", "N/A")
node_data['MaxSoc'].append(float(value))
value = data.get("ChargingBatteryPower", "N/A")
node_data['ChargingBatteryPower'].append(float(value))
value = data.get("DischargingBatteryPower", "N/A")
node_data['DischargingBatteryPower'].append(float(value))
value = data.get("HeatingPower", "N/A")
node_data['HeatingPower'].append(float(value))
print(node_data)
min_soc = min (node_data['MinSoc']) if node_data else 0.0
max_soc = max(node_data['MaxSoc']) if node_data else 0.0
total_discharging_power = sum(node_data['DischargingBatteryPower']) if node_data else 0.0
total_charging_power = sum(node_data['ChargingBatteryPower']) if node_data else 0.0
total_heating_power = sum(node_data['HeatingPower']) if node_data else 0.0
avg_discharging_power = total_discharging_power / len(node_data['DischargingBatteryPower'])
avg_charging_power = total_charging_power / len(node_data['ChargingBatteryPower'])
avg_heating_power = total_heating_power / len(node_data['HeatingPower'])
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
@staticmethod @staticmethod
def is_file_within_time_range(filename, start_time, end_time): def is_file_within_time_range(filename, start_time, end_time):

View File

@ -38,6 +38,16 @@ deploy() {
else else
echo "Warning: Failed to stop battery service on $ip_address" echo "Warning: Failed to stop battery service on $ip_address"
fi fi
echo "SSH connection successful: $ip_address"
# Stop aggregator service if changing aggregator-related files
if ssh -o StrictHostKeyChecking=no "$username@$ip_address" "svc -d /service/aggregator"; then
echo "Stopped aggregator service on $ip_address"
else
echo "Warning: Failed to stop aggregator service on $ip_address"
fi
# Copy files # Copy files
if scp -o ConnectTimeout=10 "$release_file_path/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py" "root@$ip_address:/opt/victronenergy/dbus-fzsonick-48tl"; then if scp -o ConnectTimeout=10 "$release_file_path/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py" "root@$ip_address:/opt/victronenergy/dbus-fzsonick-48tl"; then
@ -51,6 +61,19 @@ deploy() {
else else
echo "Warning: Failed to copy file to /data on $ip_address" echo "Warning: Failed to copy file to /data on $ip_address"
fi fi
# Copy files
if scp -o ConnectTimeout=10 "$release_file_path/dbus-fzsonick-48tl/aggregator.py" "root@$ip_address:/opt/victronenergy/dbus-fzsonick-48tl"; then
echo "Copied file to /opt on $ip_address"
else
echo "Warning: Failed to copy file to /opt on $ip_address"
fi
if scp -o ConnectTimeout=10 "$release_file_path/dbus-fzsonick-48tl/aggregator.py" "root@$ip_address:/data/dbus-fzsonick-48tl"; then
echo "Copied file to /data on $ip_address"
else
echo "Warning: Failed to copy file to /data on $ip_address"
fi
# Start battery service # Start battery service
if ssh -o StrictHostKeyChecking=no "$username@$ip_address" "svc -u /service/dbus-fzsonick-48tl.*"; then if ssh -o StrictHostKeyChecking=no "$username@$ip_address" "svc -u /service/dbus-fzsonick-48tl.*"; then
@ -58,6 +81,14 @@ deploy() {
else else
echo "Warning: Failed to start battery service on $ip_address" echo "Warning: Failed to start battery service on $ip_address"
fi fi
# Start aggregator service
if ssh -o StrictHostKeyChecking=no "$username@$ip_address" "svc -u /service/aggregator"; then
echo "Started aggregator service on $ip_address"
else
echo "Warning: Failed to start aggregator service on $ip_address"
fi
echo "Deployment completed for $ip_address ($device_type)" echo "Deployment completed for $ip_address ($device_type)"
done done