297 lines
13 KiB
Python
297 lines
13 KiB
Python
|
#!/usr/bin/python2 -u
|
||
|
# coding=utf-8
|
||
|
|
||
|
import os
|
||
|
import csv
|
||
|
import time
|
||
|
from datetime import datetime, timedelta
|
||
|
import requests
|
||
|
import zipfile
|
||
|
import base64
|
||
|
import io
|
||
|
import hmac
|
||
|
import hashlib
|
||
|
from threading import Thread, Event
|
||
|
import config as cfg
|
||
|
|
||
|
CSV_DIR = "/data/csv_files/"
|
||
|
HOURLY_DIR = "/data/csv_files/HourlyData"
|
||
|
DAILY_DIR = "/data/csv_files/DailyData"
|
||
|
|
||
|
# S3 Credentials
|
||
|
#S3BUCKET = "6-c0436b6a-d276-4cd8-9c44-1eae86cf5d0e"
|
||
|
#S3KEY = "EXO2a6cd837ae9279271b1710af"
|
||
|
#S3SECRET = "IAK2wc7mL0HWD9LHFeiv1nl5jvousOLLAHKCQwmwniI"
|
||
|
|
||
|
|
||
|
print("start with the correct credentials")
|
||
|
|
||
|
S3BUCKET = cfg.S3BUCKET
|
||
|
S3KEY = cfg.S3KEY
|
||
|
S3SECRET = cfg.S3SECRET
|
||
|
|
||
|
stop_event = Event()
|
||
|
|
||
|
def datetime_to_timestamp(dt):
|
||
|
return time.mktime(dt.timetuple())
|
||
|
|
||
|
class AggregatedData:
|
||
|
def __init__(self, min_soc, max_soc, discharging_battery_power, charging_battery_power, heating_power):
|
||
|
self.min_soc = min_soc
|
||
|
self.max_soc = max_soc
|
||
|
self.discharging_battery_power = discharging_battery_power
|
||
|
self.charging_battery_power = charging_battery_power
|
||
|
self.heating_power = heating_power
|
||
|
|
||
|
def to_csv(self):
|
||
|
return ("/MinSoc;{};\n"
|
||
|
"/MaxSoc;{};\n"
|
||
|
"/DischargingBatteryPower;{};\n"
|
||
|
"/ChargingBatteryPower;{};\n"
|
||
|
"/HeatingPower;{};").format(
|
||
|
self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power)
|
||
|
|
||
|
def save(self, directory):
|
||
|
timestamp = int(time.time())
|
||
|
if not os.path.exists(directory):
|
||
|
os.makedirs(directory)
|
||
|
csv_path = os.path.join(directory, "{}.csv".format(timestamp))
|
||
|
with open(csv_path, 'w') as file:
|
||
|
file.write(self.to_csv())
|
||
|
print("Saved file to:", csv_path)
|
||
|
print("File content:\n", self.to_csv())
|
||
|
|
||
|
@staticmethod
|
||
|
def delete_data(directory):
|
||
|
if not os.path.exists(directory):
|
||
|
return
|
||
|
for file in os.listdir(directory):
|
||
|
file_path = os.path.join(directory, file)
|
||
|
if os.path.isfile(file_path):
|
||
|
os.remove(file_path)
|
||
|
print("Deleted file: {}".format(file_path))
|
||
|
|
||
|
def push_to_s3(self, s3_config):
|
||
|
csv_data = self.to_csv()
|
||
|
compressed_csv = self.compress_csv_data(csv_data)
|
||
|
s3_path = datetime.now().strftime("%Y-%m-%d") + ".csv"
|
||
|
response = s3_config.create_put_request(s3_path, compressed_csv)
|
||
|
if response.status_code != 200:
|
||
|
print("ERROR: PUT", response.text)
|
||
|
return False
|
||
|
print("Successfully uploaded to S3:", s3_path)
|
||
|
return True
|
||
|
|
||
|
@staticmethod
|
||
|
def compress_csv_data(csv_data):
|
||
|
memory_stream = io.BytesIO()
|
||
|
with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive:
|
||
|
archive.writestr("data.csv", csv_data.encode('utf-8'))
|
||
|
compressed_bytes = memory_stream.getvalue()
|
||
|
return base64.b64encode(compressed_bytes).decode('utf-8')
|
||
|
|
||
|
|
||
|
class S3config:
|
||
|
def __init__(self, bucket, region, provider, key, secret):
|
||
|
self.bucket = bucket
|
||
|
self.region = region
|
||
|
self.provider = provider
|
||
|
self.key = key
|
||
|
self.secret = secret
|
||
|
self.content_type = "application/base64; charset=utf-8"
|
||
|
|
||
|
@property
|
||
|
def host(self):
|
||
|
return "{}.{}.{}".format(self.bucket, self.region, self.provider)
|
||
|
|
||
|
@property
|
||
|
def url(self):
|
||
|
return "https://{}".format(self.host)
|
||
|
|
||
|
def create_put_request(self, s3_path, data):
|
||
|
headers = self._create_request_headers("PUT", s3_path)
|
||
|
url = "{}/{}".format(self.url, s3_path)
|
||
|
response = requests.put(url, headers=headers, data=data)
|
||
|
return response
|
||
|
|
||
|
def _create_request_headers(self, method, s3_path):
|
||
|
date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
|
||
|
auth = self._create_authorization(method, s3_path, date)
|
||
|
return {
|
||
|
"Host": self.host,
|
||
|
"Date": date,
|
||
|
"Authorization": auth,
|
||
|
"Content-Type": self.content_type
|
||
|
}
|
||
|
|
||
|
def _create_authorization(self, method, s3_path, date):
|
||
|
payload = "{}\n\n{}\n{}\n/{}/{}".format(method, self.content_type, date, self.bucket, s3_path)
|
||
|
signature = base64.b64encode(
|
||
|
hmac.new(self.secret.encode(), payload.encode(), hashlib.sha1).digest()
|
||
|
).decode()
|
||
|
return "AWS {}:{}".format(self.key, signature)
|
||
|
|
||
|
|
||
|
class Aggregator:
|
||
|
@staticmethod
|
||
|
def hourly_data_aggregation_manager():
|
||
|
try:
|
||
|
current_time = datetime.now()
|
||
|
#next_rounded_minute = (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0)
|
||
|
next_rounded_hour = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
||
|
#time_until_next_minute = (next_rounded_minute - current_time).total_seconds()
|
||
|
time_until_next_hour = (next_rounded_hour - current_time).total_seconds()
|
||
|
|
||
|
#print("Waiting for {} seconds until the next rounded minute.".format(time_until_next_minute))
|
||
|
print("Waiting for {} seconds until the next rounded hour.".format(time_until_next_hour))
|
||
|
if stop_event.wait(time_until_next_hour):
|
||
|
return
|
||
|
|
||
|
while not stop_event.is_set():
|
||
|
try:
|
||
|
current_time = datetime.now()
|
||
|
after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1))
|
||
|
before_timestamp = datetime_to_timestamp(current_time)
|
||
|
aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp)
|
||
|
print("save in hourly dir")
|
||
|
aggregated_data.save(HOURLY_DIR)
|
||
|
except Exception as e:
|
||
|
print("An error occurred during hourly data aggregation:", str(e))
|
||
|
if stop_event.wait(3600): # Sleep for 1 hour
|
||
|
return
|
||
|
except KeyboardInterrupt:
|
||
|
print("Hourly data aggregation manager stopped.")
|
||
|
|
||
|
@staticmethod
|
||
|
def daily_data_aggregation_manager():
|
||
|
try:
|
||
|
current_time = datetime.now()
|
||
|
#next_rounded_five_minutes = (current_time + timedelta(minutes=5)).replace(second=0, microsecond=0)
|
||
|
next_rounded_day = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
|
||
|
#time_until_next_five_minutes = (next_rounded_five_minutes - current_time).total_seconds()
|
||
|
time_until_next_day = (next_rounded_day - current_time).total_seconds()
|
||
|
|
||
|
#print("Waiting for {} seconds until the next rounded 5 minutes.".format(time_until_next_five_minutes))
|
||
|
print("Waiting for {} seconds until the next rounded day.".format(time_until_next_day))
|
||
|
if stop_event.wait(time_until_next_day):
|
||
|
return
|
||
|
|
||
|
while not stop_event.is_set():
|
||
|
try:
|
||
|
current_time = datetime.now()
|
||
|
after_timestamp = datetime_to_timestamp(current_time - timedelta(days=1))
|
||
|
before_timestamp = datetime_to_timestamp(current_time)
|
||
|
aggregated_data = Aggregator.create_daily_data(HOURLY_DIR, after_timestamp, before_timestamp)
|
||
|
print("save in daily dir")
|
||
|
aggregated_data.save(DAILY_DIR)
|
||
|
s3_config = S3config(S3BUCKET, "sos-ch-dk-2", "exo.io", S3KEY, S3SECRET)
|
||
|
if aggregated_data.push_to_s3(s3_config):
|
||
|
print("delete from hourly dir")
|
||
|
AggregatedData.delete_data(HOURLY_DIR)
|
||
|
print("delete from daily dir")
|
||
|
AggregatedData.delete_data(DAILY_DIR)
|
||
|
except Exception as e:
|
||
|
print("An error occurred during daily data aggregation:", str(e))
|
||
|
if stop_event.wait(86400): # Sleep for 1 day
|
||
|
return
|
||
|
except KeyboardInterrupt:
|
||
|
print("Daily data aggregation manager stopped.")
|
||
|
|
||
|
@staticmethod
|
||
|
def create_hourly_data(directory, after_timestamp, before_timestamp):
|
||
|
node_data = {}
|
||
|
|
||
|
for filename in os.listdir(directory):
|
||
|
file_path = os.path.join(directory, filename)
|
||
|
if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp):
|
||
|
with open(file_path, 'r') as file:
|
||
|
reader = csv.reader(file, delimiter=';')
|
||
|
for row in reader:
|
||
|
if len(row) >= 2:
|
||
|
variable_name, value = row[0], row[1]
|
||
|
try:
|
||
|
value = float(value)
|
||
|
node_number = Aggregator.extract_node_number(variable_name)
|
||
|
if node_number not in node_data:
|
||
|
node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []}
|
||
|
if "Soc" in variable_name:
|
||
|
node_data[node_number]['soc'].append(value)
|
||
|
elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name:
|
||
|
if value < 0:
|
||
|
node_data[node_number]['discharge'].append(value)
|
||
|
else:
|
||
|
node_data[node_number]['charge'].append(value)
|
||
|
elif "/HeatingPower" in variable_name:
|
||
|
node_data[node_number]['heating'].append(value)
|
||
|
except ValueError:
|
||
|
pass
|
||
|
|
||
|
if len(node_data) == 1:
|
||
|
# Directly use the values for a single node
|
||
|
for node_number, data in node_data.items():
|
||
|
min_soc = data['soc'][0] if data['soc'] else 0.0
|
||
|
max_soc = data['soc'][0] if data['soc'] else 0.0
|
||
|
avg_discharging_power = data['discharge'][0] if data['discharge'] else 0.0
|
||
|
avg_charging_power = data['charge'][0] if data['charge'] else 0.0
|
||
|
avg_heating_power = data['heating'][0] if data['heating'] else 0.0
|
||
|
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
|
||
|
else:
|
||
|
min_soc = min([min(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
|
||
|
max_soc = max([max(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0
|
||
|
total_discharging_power = sum([sum(data['discharge']) for data in node_data.values() if data['discharge']])
|
||
|
total_charging_power = sum([sum(data['charge']) for data in node_data.values() if data['charge']])
|
||
|
total_heating_power = sum([sum(data['heating']) for data in node_data.values() if data['heating']])
|
||
|
count_discharging_power = sum([len(data['discharge']) for data in node_data.values() if data['discharge']])
|
||
|
count_charging_power = sum([len(data['charge']) for data in node_data.values() if data['charge']])
|
||
|
count_heating_power = sum([len(data['heating']) for data in node_data.values() if data['heating']])
|
||
|
avg_discharging_power = total_discharging_power / count_discharging_power if count_discharging_power else 0.0
|
||
|
avg_charging_power = total_charging_power / count_charging_power if count_charging_power else 0.0
|
||
|
avg_heating_power = total_heating_power / count_heating_power if count_heating_power else 0.0
|
||
|
return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power)
|
||
|
|
||
|
@staticmethod
|
||
|
def create_daily_data(directory, after_timestamp, before_timestamp):
|
||
|
return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp)
|
||
|
|
||
|
@staticmethod
|
||
|
def is_file_within_time_range(filename, start_time, end_time):
|
||
|
try:
|
||
|
file_timestamp = float(os.path.splitext(filename)[0])
|
||
|
return start_time <= file_timestamp < end_time
|
||
|
except ValueError:
|
||
|
return False
|
||
|
|
||
|
@staticmethod
|
||
|
def extract_node_number(variable_name):
|
||
|
parts = variable_name.split('/')
|
||
|
try:
|
||
|
return int(parts[3])
|
||
|
except (IndexError, ValueError):
|
||
|
return 0
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
print("aggregator has started aAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
|
||
|
#exit(0)
|
||
|
def run_hourly_manager():
|
||
|
Aggregator.hourly_data_aggregation_manager()
|
||
|
|
||
|
def run_daily_manager():
|
||
|
Aggregator.daily_data_aggregation_manager()
|
||
|
|
||
|
hourly_thread = Thread(target=run_hourly_manager)
|
||
|
daily_thread = Thread(target=run_daily_manager)
|
||
|
|
||
|
hourly_thread.start()
|
||
|
daily_thread.start()
|
||
|
|
||
|
try:
|
||
|
hourly_thread.join()
|
||
|
daily_thread.join()
|
||
|
except KeyboardInterrupt:
|
||
|
print("Program interrupted. Stopping threads...")
|
||
|
stop_event.set()
|
||
|
hourly_thread.join()
|
||
|
daily_thread.join()
|
||
|
print("Program stopped.")
|
||
|
|