Innovenergy_trunk/firmware/Venus_Release/VenusReleaseFiles/dbus-fzsonick-48tl/aggregator.py

297 lines
13 KiB
Python
Raw Normal View History

2024-07-04 08:46:28 +00:00
#!/usr/bin/python2 -u
# coding=utf-8
import os
import csv
import time
from datetime import datetime, timedelta
import requests
import zipfile
import base64
import io
import hmac
import hashlib
from threading import Thread, Event
import config as cfg
CSV_DIR = "/data/csv_files/"
HOURLY_DIR = "/data/csv_files/HourlyData"
DAILY_DIR = "/data/csv_files/DailyData"
print("start with the correct credentials")
S3BUCKET = cfg.S3BUCKET
S3KEY = cfg.S3KEY
S3SECRET = cfg.S3SECRET
stop_event = Event()
def datetime_to_timestamp(dt):
return time.mktime(dt.timetuple())
class AggregatedData:
def __init__(self, min_soc, max_soc, discharging_battery_power, charging_battery_power, heating_power):
self.min_soc = min_soc
self.max_soc = max_soc
self.discharging_battery_power = discharging_battery_power
self.charging_battery_power = charging_battery_power
self.heating_power = heating_power
def to_csv(self):
return ("/MinSoc;{};\n"
"/MaxSoc;{};\n"
"/DischargingBatteryPower;{};\n"
"/ChargingBatteryPower;{};\n"
"/HeatingPower;{};").format(
self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power)
def save(self, directory):
timestamp = int(time.time())
if not os.path.exists(directory):
os.makedirs(directory)
csv_path = os.path.join(directory, "{}.csv".format(timestamp))
with open(csv_path, 'w') as file:
file.write(self.to_csv())
print("Saved file to:", csv_path)
print("File content:\n", self.to_csv())
@staticmethod
def delete_data(directory):
if not os.path.exists(directory):
return
for file in os.listdir(directory):
file_path = os.path.join(directory, file)
if os.path.isfile(file_path):
os.remove(file_path)
print("Deleted file: {}".format(file_path))
def push_to_s3(self, s3_config):
csv_data = self.to_csv()
compressed_csv = self.compress_csv_data(csv_data)
now = datetime.now()
if now.hour == 0 and now.minute < 30:
adjusted_date = now - timedelta(days=1)
else:
adjusted_date = now
s3_path = adjusted_date.strftime("%Y-%m-%d") + ".csv"
2024-07-04 08:46:28 +00:00
response = s3_config.create_put_request(s3_path, compressed_csv)
if response.status_code != 200:
print("ERROR: PUT", response.text)
return False
print("Successfully uploaded to S3:", s3_path)
return True
@staticmethod
def compress_csv_data(csv_data):
memory_stream = io.BytesIO()
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
archive.writestr("data.csv", csv_data.encode('utf-8'))
compressed_bytes = memory_stream.getvalue()
return base64.b64encode(compressed_bytes).decode('utf-8')
class S3config:
def __init__(self, bucket, region, provider, key, secret):
self.bucket = bucket
self.region = region
self.provider = provider
self.key = key
self.secret = secret
self.content_type = "application/base64; charset=utf-8"
@property
def host(self):
return "{}.{}.{}".format(self.bucket, self.region, self.provider)
@property
def url(self):
return "https://{}".format(self.host)
def create_put_request(self, s3_path, data):
headers = self._create_request_headers("PUT", s3_path)
url = "{}/{}".format(self.url, s3_path)
response = requests.put(url, headers=headers, data=data)
return response
def _create_request_headers(self, method, s3_path):
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
auth = self._create_authorization(method, s3_path, date)
return {
"Host": self.host,
"Date": date,
"Authorization": auth,
"Content-Type": self.content_type
}
def _create_authorization(self, method, s3_path, date):
payload = "{}\n\n{}\n{}\n/{}/{}".format(method, self.content_type, date, self.bucket, s3_path)
signature = base64.b64encode(
hmac.new(self.secret.encode(), payload.encode(), hashlib.sha1).digest()
).decode()
return "AWS {}:{}".format(self.key, signature)
class Aggregator:
@staticmethod
def hourly_data_aggregation_manager():
try:
current_time = datetime.now()
#next_rounded_minute = (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0)
next_rounded_hour = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
#time_until_next_minute = (next_rounded_minute - current_time).total_seconds()
time_until_next_hour = (next_rounded_hour - current_time).total_seconds()
#print("Waiting for {} seconds until the next rounded minute.".format(time_until_next_minute))
print("Waiting for {} seconds until the next rounded hour.".format(time_until_next_hour))
if stop_event.wait(time_until_next_hour):
return
while not stop_event.is_set():
try:
current_time = datetime.now()
after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1))
before_timestamp = datetime_to_timestamp(current_time)
aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp)
print("save in hourly dir")
aggregated_data.save(HOURLY_DIR)
except Exception as e:
print("An error occurred during hourly data aggregation:", str(e))
if stop_event.wait(3600): # Sleep for 1 hour
return
except KeyboardInterrupt:
print("Hourly data aggregation manager stopped.")
@staticmethod
def daily_data_aggregation_manager():
try:
current_time = datetime.now()
#next_rounded_five_minutes = (current_time + timedelta(minutes=5)).replace(second=0, microsecond=0)
next_rounded_day = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
#time_until_next_five_minutes = (next_rounded_five_minutes - current_time).total_seconds()
time_until_next_day = (next_rounded_day - current_time).total_seconds()
#print("Waiting for {} seconds until the next rounded 5 minutes.".format(time_until_next_five_minutes))
print("Waiting for {} seconds until the next rounded day.".format(time_until_next_day))
if stop_event.wait(time_until_next_day):
return
while not stop_event.is_set():
try:
current_time = datetime.now()
after_timestamp = datetime_to_timestamp(current_time - timedelta(days=1))
before_timestamp = datetime_to_timestamp(current_time)
aggregated_data = Aggregator.create_daily_data(HOURLY_DIR, after_timestamp, before_timestamp)
print("save in daily dir")
aggregated_data.save(DAILY_DIR)
s3_config = S3config(S3BUCKET, "sos-ch-dk-2", "exo.io", S3KEY, S3SECRET)
if aggregated_data.push_to_s3(s3_config):
print("delete from hourly dir")
AggregatedData.delete_data(HOURLY_DIR)
print("delete from daily dir")
AggregatedData.delete_data(DAILY_DIR)
except Exception as e:
print("An error occurred during daily data aggregation:", str(e))
if stop_event.wait(86400): # Sleep for 1 day
return
except KeyboardInterrupt:
print("Daily data aggregation manager stopped.")
@staticmethod
def create_hourly_data(directory, after_timestamp, before_timestamp):
node_data = {}
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp):
with open(file_path, 'r') as file:
reader = csv.reader(file, delimiter=';')
for row in reader:
if len(row) >= 2:
variable_name, value = row[0], row[1]
try:
value = float(value)
node_number = Aggregator.extract_node_number(variable_name)
if node_number not in node_data:
node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
if "Soc" in variable_name:
node_data[node_number]['soc'].append(value)
elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name:
if value < 0:
node_data[node_number]['discharge'].append(value)
else:
node_data[node_number]['charge'].append(value)
elif "/HeatingPower" in variable_name:
node_data[node_number]['heating'].append(value)
except ValueError:
pass
if len(node_data) == 1:
# Directly use the values for a single node
for node_number, data in node_data.items():
min_soc = data['soc'][0] if data['soc'] else 0.0
max_soc = data['soc'][0] if data['soc'] else 0.0
avg_discharging_power = data['discharge'][0] if data['discharge'] else 0.0
avg_charging_power = data['charge'][0] if data['charge'] else 0.0
avg_heating_power = data['heating'][0] if data['heating'] else 0.0
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
else:
min_soc = min([min(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
max_soc = max([max(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
total_discharging_power = sum([sum(data['discharge']) for data in node_data.values() if data['discharge']])
total_charging_power = sum([sum(data['charge']) for data in node_data.values() if data['charge']])
total_heating_power = sum([sum(data['heating']) for data in node_data.values() if data['heating']])
count_discharging_power = sum([len(data['discharge']) for data in node_data.values() if data['discharge']])
count_charging_power = sum([len(data['charge']) for data in node_data.values() if data['charge']])
count_heating_power = sum([len(data['heating']) for data in node_data.values() if data['heating']])
avg_discharging_power = total_discharging_power / count_discharging_power if count_discharging_power else 0.0
avg_charging_power = total_charging_power / count_charging_power if count_charging_power else 0.0
avg_heating_power = total_heating_power / count_heating_power if count_heating_power else 0.0
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
@staticmethod
def create_daily_data(directory, after_timestamp, before_timestamp):
return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp)
@staticmethod
def is_file_within_time_range(filename, start_time, end_time):
try:
file_timestamp = float(os.path.splitext(filename)[0])
return start_time <= file_timestamp < end_time
except ValueError:
return False
@staticmethod
def extract_node_number(variable_name):
parts = variable_name.split('/')
try:
return int(parts[3])
except (IndexError, ValueError):
return 0
if __name__ == "__main__":
print("aggregator has started")
2024-07-04 08:46:28 +00:00
def run_hourly_manager():
Aggregator.hourly_data_aggregation_manager()
def run_daily_manager():
Aggregator.daily_data_aggregation_manager()
hourly_thread = Thread(target=run_hourly_manager)
daily_thread = Thread(target=run_daily_manager)
hourly_thread.start()
daily_thread.start()
try:
hourly_thread.join()
daily_thread.join()
except KeyboardInterrupt:
print("Program interrupted. Stopping threads...")
stop_event.set()
hourly_thread.join()
daily_thread.join()
print("Program stopped.")