add aggregator.py to Cerbo and Venus
This commit is contained in:
parent
fac256bca9
commit
15638f639c
|
@ -0,0 +1,284 @@
|
|||
#!/usr/bin/python3 -u
|
||||
# coding=utf-8
|
||||
|
||||
import os
|
||||
import csv
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
import requests
|
||||
import zipfile
|
||||
import base64
|
||||
import io
|
||||
import hmac
|
||||
import hashlib
|
||||
from threading import Thread, Event
|
||||
import config as cfg
|
||||
|
||||
CSV_DIR = "/data/csv_files/"
|
||||
HOURLY_DIR = "/data/csv_files/HourlyData"
|
||||
DAILY_DIR = "/data/csv_files/DailyData"
|
||||
|
||||
# S3 Credentials
|
||||
print("Start with the correct credentials")
|
||||
|
||||
S3BUCKET = cfg.S3BUCKET
|
||||
S3KEY = cfg.S3KEY
|
||||
S3SECRET = cfg.S3SECRET
|
||||
|
||||
stop_event = Event()
|
||||
|
||||
def datetime_to_timestamp(dt):
|
||||
return time.mktime(dt.timetuple())
|
||||
|
||||
class AggregatedData:
|
||||
def __init__(self, min_soc, max_soc, discharging_battery_power, charging_battery_power, heating_power):
|
||||
self.min_soc = min_soc
|
||||
self.max_soc = max_soc
|
||||
self.discharging_battery_power = discharging_battery_power
|
||||
self.charging_battery_power = charging_battery_power
|
||||
self.heating_power = heating_power
|
||||
|
||||
def to_csv(self):
|
||||
return ("/MinSoc;{};\n"
|
||||
"/MaxSoc;{};\n"
|
||||
"/DischargingBatteryPower;{};\n"
|
||||
"/ChargingBatteryPower;{};\n"
|
||||
"/HeatingPower;{};").format(
|
||||
self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power)
|
||||
|
||||
def save(self, directory):
|
||||
timestamp = int(time.time())
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
csv_path = os.path.join(directory, "{}.csv".format(timestamp))
|
||||
with open(csv_path, 'w') as file:
|
||||
file.write(self.to_csv())
|
||||
print("Saved file to:", csv_path)
|
||||
print("File content:\n", self.to_csv())
|
||||
|
||||
@staticmethod
|
||||
def delete_data(directory):
|
||||
if not os.path.exists(directory):
|
||||
return
|
||||
for file in os.listdir(directory):
|
||||
file_path = os.path.join(directory, file)
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
print("Deleted file: {}".format(file_path))
|
||||
|
||||
def push_to_s3(self, s3_config):
|
||||
csv_data = self.to_csv()
|
||||
compressed_csv = self.compress_csv_data(csv_data)
|
||||
s3_path = datetime.now().strftime("%Y-%m-%d") + ".csv"
|
||||
response = s3_config.create_put_request(s3_path, compressed_csv)
|
||||
if response.status_code != 200:
|
||||
print("ERROR: PUT", response.text)
|
||||
return False
|
||||
print("Successfully uploaded to S3:", s3_path)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def compress_csv_data(csv_data):
|
||||
memory_stream = io.BytesIO()
|
||||
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
|
||||
archive.writestr("data.csv", csv_data.encode('utf-8'))
|
||||
compressed_bytes = memory_stream.getvalue()
|
||||
return base64.b64encode(compressed_bytes).decode('utf-8')
|
||||
|
||||
|
||||
class S3config:
|
||||
def __init__(self, bucket, region, provider, key, secret):
|
||||
self.bucket = bucket
|
||||
self.region = region
|
||||
self.provider = provider
|
||||
self.key = key
|
||||
self.secret = secret
|
||||
self.content_type = "application/base64; charset=utf-8"
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
return "{}.{}.{}".format(self.bucket, self.region, self.provider)
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return "https://{}".format(self.host)
|
||||
|
||||
def create_put_request(self, s3_path, data):
|
||||
headers = self._create_request_headers("PUT", s3_path)
|
||||
url = "{}/{}".format(self.url, s3_path)
|
||||
response = requests.put(url, headers=headers, data=data)
|
||||
return response
|
||||
|
||||
def _create_request_headers(self, method, s3_path):
|
||||
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
|
||||
auth = self._create_authorization(method, s3_path, date)
|
||||
return {
|
||||
"Host": self.host,
|
||||
"Date": date,
|
||||
"Authorization": auth,
|
||||
"Content-Type": self.content_type
|
||||
}
|
||||
|
||||
def _create_authorization(self, method, s3_path, date):
|
||||
payload = "{}\n\n{}\n{}\n/{}/{}".format(method, self.content_type, date, self.bucket, s3_path)
|
||||
signature = base64.b64encode(
|
||||
hmac.new(self.secret.encode(), payload.encode(), hashlib.sha1).digest()
|
||||
).decode()
|
||||
return "AWS {}:{}".format(self.key, signature)
|
||||
|
||||
|
||||
class Aggregator:
|
||||
@staticmethod
|
||||
def hourly_data_aggregation_manager():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
next_rounded_hour = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
||||
time_until_next_hour = (next_rounded_hour - current_time).total_seconds()
|
||||
|
||||
print("Waiting for {} seconds until the next rounded hour.".format(time_until_next_hour))
|
||||
if stop_event.wait(time_until_next_hour):
|
||||
return
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1))
|
||||
before_timestamp = datetime_to_timestamp(current_time)
|
||||
aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp)
|
||||
print("Saving in hourly directory")
|
||||
aggregated_data.save(HOURLY_DIR)
|
||||
except Exception as e:
|
||||
print("An error occurred during hourly data aggregation:", str(e))
|
||||
if stop_event.wait(3600): # Sleep for 1 hour
|
||||
return
|
||||
except KeyboardInterrupt:
|
||||
print("Hourly data aggregation manager stopped.")
|
||||
|
||||
@staticmethod
|
||||
def daily_data_aggregation_manager():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
next_rounded_day = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
time_until_next_day = (next_rounded_day - current_time).total_seconds()
|
||||
|
||||
print("Waiting for {} seconds until the next rounded day.".format(time_until_next_day))
|
||||
if stop_event.wait(time_until_next_day):
|
||||
return
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
after_timestamp = datetime_to_timestamp(current_time - timedelta(days=1))
|
||||
before_timestamp = datetime_to_timestamp(current_time)
|
||||
aggregated_data = Aggregator.create_daily_data(HOURLY_DIR, after_timestamp, before_timestamp)
|
||||
print("Saving in daily directory")
|
||||
aggregated_data.save(DAILY_DIR)
|
||||
s3_config = S3config(S3BUCKET, "sos-ch-dk-2", "exo.io", S3KEY, S3SECRET)
|
||||
if aggregated_data.push_to_s3(s3_config):
|
||||
print("Deleting from hourly directory")
|
||||
AggregatedData.delete_data(HOURLY_DIR)
|
||||
print("Deleting from daily directory")
|
||||
AggregatedData.delete_data(DAILY_DIR)
|
||||
except Exception as e:
|
||||
print("An error occurred during daily data aggregation:", str(e))
|
||||
if stop_event.wait(86400): # Sleep for 1 day
|
||||
return
|
||||
except KeyboardInterrupt:
|
||||
print("Daily data aggregation manager stopped.")
|
||||
|
||||
@staticmethod
|
||||
def create_hourly_data(directory, after_timestamp, before_timestamp):
|
||||
node_data = {}
|
||||
|
||||
for filename in os.listdir(directory):
|
||||
file_path = os.path.join(directory, filename)
|
||||
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp):
|
||||
with open(file_path, 'r') as file:
|
||||
reader = csv.reader(file, delimiter=';')
|
||||
for row in reader:
|
||||
if len(row) >= 2:
|
||||
variable_name, value = row[0], row[1]
|
||||
try:
|
||||
value = float(value)
|
||||
node_number = Aggregator.extract_node_number(variable_name)
|
||||
if node_number not in node_data:
|
||||
node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
|
||||
if "Soc" in variable_name:
|
||||
node_data[node_number]['soc'].append(value)
|
||||
elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name:
|
||||
if value < 0:
|
||||
node_data[node_number]['discharge'].append(value)
|
||||
else:
|
||||
node_data[node_number]['charge'].append(value)
|
||||
elif "/HeatingPower" in variable_name:
|
||||
node_data[node_number]['heating'].append(value)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if len(node_data) == 1:
|
||||
# Directly use the values for a single node
|
||||
for node_number, data in node_data.items():
|
||||
min_soc = data['soc'][0] if data['soc'] else 0.0
|
||||
max_soc = data['soc'][0] if data['soc'] else 0.0
|
||||
avg_discharging_power = data['discharge'][0] if data['discharge'] else 0.0
|
||||
avg_charging_power = data['charge'][0] if data['charge'] else 0.0
|
||||
avg_heating_power = data['heating'][0] if data['heating'] else 0.0
|
||||
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
|
||||
else:
|
||||
min_soc = min([min(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
|
||||
max_soc = max([max(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
|
||||
total_discharging_power = sum([sum(data['discharge']) for data in node_data.values() if data['discharge']])
|
||||
total_charging_power = sum([sum(data['charge']) for data in node_data.values() if data['charge']])
|
||||
total_heating_power = sum([sum(data['heating']) for data in node_data.values() if data['heating']])
|
||||
count_discharging_power = sum([len(data['discharge']) for data in node_data.values() if data['discharge']])
|
||||
count_charging_power = sum([len(data['charge']) for data in node_data.values() if data['charge']])
|
||||
count_heating_power = sum([len(data['heating']) for data in node_data.values() if data['heating']])
|
||||
avg_discharging_power = total_discharging_power / count_discharging_power if count_discharging_power else 0.0
|
||||
avg_charging_power = total_charging_power / count_charging_power if count_charging_power else 0.0
|
||||
avg_heating_power = total_heating_power / count_heating_power if count_heating_power else 0.0
|
||||
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
|
||||
|
||||
@staticmethod
|
||||
def create_daily_data(directory, after_timestamp, before_timestamp):
|
||||
return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp)
|
||||
|
||||
@staticmethod
|
||||
def is_file_within_time_range(filename, start_time, end_time):
|
||||
try:
|
||||
file_timestamp = float(os.path.splitext(filename)[0])
|
||||
return start_time <= file_timestamp < end_time
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def extract_node_number(variable_name):
|
||||
parts = variable_name.split('/')
|
||||
try:
|
||||
return int(parts[3])
|
||||
except (IndexError, ValueError):
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Aggregator has started AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
|
||||
|
||||
def run_hourly_manager():
|
||||
Aggregator.hourly_data_aggregation_manager()
|
||||
|
||||
def run_daily_manager():
|
||||
Aggregator.daily_data_aggregation_manager()
|
||||
|
||||
hourly_thread = Thread(target=run_hourly_manager)
|
||||
daily_thread = Thread(target=run_daily_manager)
|
||||
|
||||
hourly_thread.start()
|
||||
daily_thread.start()
|
||||
|
||||
try:
|
||||
hourly_thread.join()
|
||||
daily_thread.join()
|
||||
except KeyboardInterrupt:
|
||||
print("Program interrupted. Stopping threads...")
|
||||
stop_event.set()
|
||||
hourly_thread.join()
|
||||
daily_thread.join()
|
||||
print("Program stopped.")
|
|
@ -2,6 +2,14 @@
|
|||
|
||||
. /opt/victronenergy/serial-starter/run-service.sh
|
||||
|
||||
app=/opt/victronenergy/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py
|
||||
app="/opt/victronenergy/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py"
|
||||
args="$tty"
|
||||
|
||||
# Start aggregator.py in the background
|
||||
/opt/victronenergy/dbus-fzsonick-48tl/aggregator.py &
|
||||
|
||||
# Start dbus-fzsonick-48tl.py using the start command
|
||||
start $args
|
||||
|
||||
# Wait for all background processes to finish
|
||||
wait
|
||||
|
|
|
@ -17,6 +17,10 @@ for ip_address in "${ip_addresses_usb0[@]}"; do
|
|||
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
|
||||
ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB0"
|
||||
|
||||
|
@ -30,6 +34,10 @@ for ip_address in "${ip_addresses_usb1[@]}"; do
|
|||
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
|
||||
ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB1"
|
||||
|
||||
|
|
|
@ -0,0 +1,296 @@
|
|||
#!/usr/bin/python2 -u
|
||||
# coding=utf-8
|
||||
|
||||
import os
|
||||
import csv
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
import requests
|
||||
import zipfile
|
||||
import base64
|
||||
import io
|
||||
import hmac
|
||||
import hashlib
|
||||
from threading import Thread, Event
|
||||
import config as cfg
|
||||
|
||||
CSV_DIR = "/data/csv_files/"
|
||||
HOURLY_DIR = "/data/csv_files/HourlyData"
|
||||
DAILY_DIR = "/data/csv_files/DailyData"
|
||||
|
||||
# S3 Credentials
|
||||
#S3BUCKET = "6-c0436b6a-d276-4cd8-9c44-1eae86cf5d0e"
|
||||
#S3KEY = "EXO2a6cd837ae9279271b1710af"
|
||||
#S3SECRET = "IAK2wc7mL0HWD9LHFeiv1nl5jvousOLLAHKCQwmwniI"
|
||||
|
||||
|
||||
print("start with the correct credentials")
|
||||
|
||||
S3BUCKET = cfg.S3BUCKET
|
||||
S3KEY = cfg.S3KEY
|
||||
S3SECRET = cfg.S3SECRET
|
||||
|
||||
stop_event = Event()
|
||||
|
||||
def datetime_to_timestamp(dt):
|
||||
return time.mktime(dt.timetuple())
|
||||
|
||||
class AggregatedData:
|
||||
def __init__(self, min_soc, max_soc, discharging_battery_power, charging_battery_power, heating_power):
|
||||
self.min_soc = min_soc
|
||||
self.max_soc = max_soc
|
||||
self.discharging_battery_power = discharging_battery_power
|
||||
self.charging_battery_power = charging_battery_power
|
||||
self.heating_power = heating_power
|
||||
|
||||
def to_csv(self):
|
||||
return ("/MinSoc;{};\n"
|
||||
"/MaxSoc;{};\n"
|
||||
"/DischargingBatteryPower;{};\n"
|
||||
"/ChargingBatteryPower;{};\n"
|
||||
"/HeatingPower;{};").format(
|
||||
self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power)
|
||||
|
||||
def save(self, directory):
|
||||
timestamp = int(time.time())
|
||||
if not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
csv_path = os.path.join(directory, "{}.csv".format(timestamp))
|
||||
with open(csv_path, 'w') as file:
|
||||
file.write(self.to_csv())
|
||||
print("Saved file to:", csv_path)
|
||||
print("File content:\n", self.to_csv())
|
||||
|
||||
@staticmethod
|
||||
def delete_data(directory):
|
||||
if not os.path.exists(directory):
|
||||
return
|
||||
for file in os.listdir(directory):
|
||||
file_path = os.path.join(directory, file)
|
||||
if os.path.isfile(file_path):
|
||||
os.remove(file_path)
|
||||
print("Deleted file: {}".format(file_path))
|
||||
|
||||
def push_to_s3(self, s3_config):
|
||||
csv_data = self.to_csv()
|
||||
compressed_csv = self.compress_csv_data(csv_data)
|
||||
s3_path = datetime.now().strftime("%Y-%m-%d") + ".csv"
|
||||
response = s3_config.create_put_request(s3_path, compressed_csv)
|
||||
if response.status_code != 200:
|
||||
print("ERROR: PUT", response.text)
|
||||
return False
|
||||
print("Successfully uploaded to S3:", s3_path)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def compress_csv_data(csv_data):
|
||||
memory_stream = io.BytesIO()
|
||||
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
|
||||
archive.writestr("data.csv", csv_data.encode('utf-8'))
|
||||
compressed_bytes = memory_stream.getvalue()
|
||||
return base64.b64encode(compressed_bytes).decode('utf-8')
|
||||
|
||||
|
||||
class S3config:
|
||||
def __init__(self, bucket, region, provider, key, secret):
|
||||
self.bucket = bucket
|
||||
self.region = region
|
||||
self.provider = provider
|
||||
self.key = key
|
||||
self.secret = secret
|
||||
self.content_type = "application/base64; charset=utf-8"
|
||||
|
||||
@property
|
||||
def host(self):
|
||||
return "{}.{}.{}".format(self.bucket, self.region, self.provider)
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return "https://{}".format(self.host)
|
||||
|
||||
def create_put_request(self, s3_path, data):
|
||||
headers = self._create_request_headers("PUT", s3_path)
|
||||
url = "{}/{}".format(self.url, s3_path)
|
||||
response = requests.put(url, headers=headers, data=data)
|
||||
return response
|
||||
|
||||
def _create_request_headers(self, method, s3_path):
|
||||
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
|
||||
auth = self._create_authorization(method, s3_path, date)
|
||||
return {
|
||||
"Host": self.host,
|
||||
"Date": date,
|
||||
"Authorization": auth,
|
||||
"Content-Type": self.content_type
|
||||
}
|
||||
|
||||
def _create_authorization(self, method, s3_path, date):
|
||||
payload = "{}\n\n{}\n{}\n/{}/{}".format(method, self.content_type, date, self.bucket, s3_path)
|
||||
signature = base64.b64encode(
|
||||
hmac.new(self.secret.encode(), payload.encode(), hashlib.sha1).digest()
|
||||
).decode()
|
||||
return "AWS {}:{}".format(self.key, signature)
|
||||
|
||||
|
||||
class Aggregator:
|
||||
@staticmethod
|
||||
def hourly_data_aggregation_manager():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
#next_rounded_minute = (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0)
|
||||
next_rounded_hour = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
||||
#time_until_next_minute = (next_rounded_minute - current_time).total_seconds()
|
||||
time_until_next_hour = (next_rounded_hour - current_time).total_seconds()
|
||||
|
||||
#print("Waiting for {} seconds until the next rounded minute.".format(time_until_next_minute))
|
||||
print("Waiting for {} seconds until the next rounded hour.".format(time_until_next_hour))
|
||||
if stop_event.wait(time_until_next_hour):
|
||||
return
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1))
|
||||
before_timestamp = datetime_to_timestamp(current_time)
|
||||
aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp)
|
||||
print("save in hourly dir")
|
||||
aggregated_data.save(HOURLY_DIR)
|
||||
except Exception as e:
|
||||
print("An error occurred during hourly data aggregation:", str(e))
|
||||
if stop_event.wait(3600): # Sleep for 1 hour
|
||||
return
|
||||
except KeyboardInterrupt:
|
||||
print("Hourly data aggregation manager stopped.")
|
||||
|
||||
@staticmethod
|
||||
def daily_data_aggregation_manager():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
#next_rounded_five_minutes = (current_time + timedelta(minutes=5)).replace(second=0, microsecond=0)
|
||||
next_rounded_day = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
#time_until_next_five_minutes = (next_rounded_five_minutes - current_time).total_seconds()
|
||||
time_until_next_day = (next_rounded_day - current_time).total_seconds()
|
||||
|
||||
#print("Waiting for {} seconds until the next rounded 5 minutes.".format(time_until_next_five_minutes))
|
||||
print("Waiting for {} seconds until the next rounded day.".format(time_until_next_day))
|
||||
if stop_event.wait(time_until_next_day):
|
||||
return
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
after_timestamp = datetime_to_timestamp(current_time - timedelta(days=1))
|
||||
before_timestamp = datetime_to_timestamp(current_time)
|
||||
aggregated_data = Aggregator.create_daily_data(HOURLY_DIR, after_timestamp, before_timestamp)
|
||||
print("save in daily dir")
|
||||
aggregated_data.save(DAILY_DIR)
|
||||
s3_config = S3config(S3BUCKET, "sos-ch-dk-2", "exo.io", S3KEY, S3SECRET)
|
||||
if aggregated_data.push_to_s3(s3_config):
|
||||
print("delete from hourly dir")
|
||||
AggregatedData.delete_data(HOURLY_DIR)
|
||||
print("delete from daily dir")
|
||||
AggregatedData.delete_data(DAILY_DIR)
|
||||
except Exception as e:
|
||||
print("An error occurred during daily data aggregation:", str(e))
|
||||
if stop_event.wait(86400): # Sleep for 1 day
|
||||
return
|
||||
except KeyboardInterrupt:
|
||||
print("Daily data aggregation manager stopped.")
|
||||
|
||||
@staticmethod
|
||||
def create_hourly_data(directory, after_timestamp, before_timestamp):
|
||||
node_data = {}
|
||||
|
||||
for filename in os.listdir(directory):
|
||||
file_path = os.path.join(directory, filename)
|
||||
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp):
|
||||
with open(file_path, 'r') as file:
|
||||
reader = csv.reader(file, delimiter=';')
|
||||
for row in reader:
|
||||
if len(row) >= 2:
|
||||
variable_name, value = row[0], row[1]
|
||||
try:
|
||||
value = float(value)
|
||||
node_number = Aggregator.extract_node_number(variable_name)
|
||||
if node_number not in node_data:
|
||||
node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
|
||||
if "Soc" in variable_name:
|
||||
node_data[node_number]['soc'].append(value)
|
||||
elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name:
|
||||
if value < 0:
|
||||
node_data[node_number]['discharge'].append(value)
|
||||
else:
|
||||
node_data[node_number]['charge'].append(value)
|
||||
elif "/HeatingPower" in variable_name:
|
||||
node_data[node_number]['heating'].append(value)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if len(node_data) == 1:
|
||||
# Directly use the values for a single node
|
||||
for node_number, data in node_data.items():
|
||||
min_soc = data['soc'][0] if data['soc'] else 0.0
|
||||
max_soc = data['soc'][0] if data['soc'] else 0.0
|
||||
avg_discharging_power = data['discharge'][0] if data['discharge'] else 0.0
|
||||
avg_charging_power = data['charge'][0] if data['charge'] else 0.0
|
||||
avg_heating_power = data['heating'][0] if data['heating'] else 0.0
|
||||
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
|
||||
else:
|
||||
min_soc = min([min(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
|
||||
max_soc = max([max(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
|
||||
total_discharging_power = sum([sum(data['discharge']) for data in node_data.values() if data['discharge']])
|
||||
total_charging_power = sum([sum(data['charge']) for data in node_data.values() if data['charge']])
|
||||
total_heating_power = sum([sum(data['heating']) for data in node_data.values() if data['heating']])
|
||||
count_discharging_power = sum([len(data['discharge']) for data in node_data.values() if data['discharge']])
|
||||
count_charging_power = sum([len(data['charge']) for data in node_data.values() if data['charge']])
|
||||
count_heating_power = sum([len(data['heating']) for data in node_data.values() if data['heating']])
|
||||
avg_discharging_power = total_discharging_power / count_discharging_power if count_discharging_power else 0.0
|
||||
avg_charging_power = total_charging_power / count_charging_power if count_charging_power else 0.0
|
||||
avg_heating_power = total_heating_power / count_heating_power if count_heating_power else 0.0
|
||||
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
|
||||
|
||||
@staticmethod
|
||||
def create_daily_data(directory, after_timestamp, before_timestamp):
|
||||
return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp)
|
||||
|
||||
@staticmethod
|
||||
def is_file_within_time_range(filename, start_time, end_time):
|
||||
try:
|
||||
file_timestamp = float(os.path.splitext(filename)[0])
|
||||
return start_time <= file_timestamp < end_time
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def extract_node_number(variable_name):
|
||||
parts = variable_name.split('/')
|
||||
try:
|
||||
return int(parts[3])
|
||||
except (IndexError, ValueError):
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("aggregator has started aAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
|
||||
#exit(0)
|
||||
def run_hourly_manager():
|
||||
Aggregator.hourly_data_aggregation_manager()
|
||||
|
||||
def run_daily_manager():
|
||||
Aggregator.daily_data_aggregation_manager()
|
||||
|
||||
hourly_thread = Thread(target=run_hourly_manager)
|
||||
daily_thread = Thread(target=run_daily_manager)
|
||||
|
||||
hourly_thread.start()
|
||||
daily_thread.start()
|
||||
|
||||
try:
|
||||
hourly_thread.join()
|
||||
daily_thread.join()
|
||||
except KeyboardInterrupt:
|
||||
print("Program interrupted. Stopping threads...")
|
||||
stop_event.set()
|
||||
hourly_thread.join()
|
||||
daily_thread.join()
|
||||
print("Program stopped.")
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
#!/bin/sh
|
||||
exec 2>&1
|
||||
|
||||
exec softlimit -d 100000000 -s 1000000 -a 100000000 /opt/innovenergy/dbus-fzsonick-48tl/start.sh TTY
|
||||
softlimit -d 100000000 -s 1000000 -a 100000000 /opt/innovenergy/dbus-fzsonick-48tl/start.sh
|
||||
|
||||
|
|
|
@ -4,4 +4,13 @@
|
|||
|
||||
app="/opt/innovenergy/dbus-fzsonick-48tl/dbus-fzsonick-48tl.py"
|
||||
args="$tty"
|
||||
|
||||
# Start aggregator.py in the background
|
||||
/opt/innovenergy/dbus-fzsonick-48tl/aggregator.py &
|
||||
|
||||
# Start dbus-fzsonick-48tl.py using the start command
|
||||
start $args
|
||||
|
||||
# Wait for all background processes to finish
|
||||
wait
|
||||
|
||||
|
|
|
@ -7,10 +7,8 @@ set -e
|
|||
|
||||
|
||||
echo -e "\n============================ Deploy ============================\n"
|
||||
#ip_addresses_usb0=("10.2.0.155" "10.2.1.97" "10.2.0.104" "10.2.1.159" "10.2.0.224" "10.2.0.209" "10.2.2.36")
|
||||
ip_addresses_usb0=("10.2.2.36")
|
||||
|
||||
#ip_addresses_usb1=("10.2.1.35")
|
||||
ip_addresses_usb0=("10.2.0.155" "10.2.1.97" "10.2.0.104" "10.2.1.159" "10.2.0.224" "10.2.0.209" "10.2.0.227")
|
||||
ip_addresses_usb1=("10.2.1.35")
|
||||
|
||||
|
||||
for ip_address in "${ip_addresses_usb0[@]}"; do
|
||||
|
@ -19,6 +17,14 @@ for ip_address in "${ip_addresses_usb0[@]}"; do
|
|||
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "signals.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "signals.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "service/run" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/service"
|
||||
scp "service/run" "root@"$ip_address":/data/dbus-fzsonick-48tl/service"
|
||||
|
||||
ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB0"
|
||||
|
||||
|
@ -32,6 +38,14 @@ for ip_address in "${ip_addresses_usb1[@]}"; do
|
|||
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "dbus-fzsonick-48tl.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "aggregator.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "start.sh" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "signals.py" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/"
|
||||
scp "signals.py" "root@"$ip_address":/data/dbus-fzsonick-48tl/"
|
||||
scp "service/run" "root@"$ip_address":/opt/victronenergy/dbus-fzsonick-48tl/service"
|
||||
scp "service/run" "root@"$ip_address":/data/dbus-fzsonick-48tl/service"
|
||||
|
||||
ssh "$username"@"$ip_address" "cd /opt/victronenergy/serial-starter && echo '$root_password' | ./start-tty.sh ttyUSB1"
|
||||
|
||||
|
|
Loading…
Reference in New Issue