#!/usr/bin/python2 -u # coding=utf-8 import os import csv import time from datetime import datetime, timedelta import requests import zipfile import base64 import io import hmac import hashlib from threading import Thread, Event import config as cfg CSV_DIR = "/data/csv_files/" HOURLY_DIR = "/data/csv_files/HourlyData" DAILY_DIR = "/data/csv_files/DailyData" print("start with the correct credentials") S3BUCKET = cfg.S3BUCKET S3KEY = cfg.S3KEY S3SECRET = cfg.S3SECRET stop_event = Event() def datetime_to_timestamp(dt): return time.mktime(dt.timetuple()) class AggregatedData: def __init__(self, min_soc, max_soc, discharging_battery_power, charging_battery_power, heating_power): self.min_soc = min_soc self.max_soc = max_soc self.discharging_battery_power = discharging_battery_power self.charging_battery_power = charging_battery_power self.heating_power = heating_power def to_csv(self): return ("/MinSoc;{};\n" "/MaxSoc;{};\n" "/DischargingBatteryPower;{};\n" "/ChargingBatteryPower;{};\n" "/HeatingPower;{};").format( self.min_soc, self.max_soc, self.discharging_battery_power, self.charging_battery_power, self.heating_power) def save(self, directory): timestamp = int(time.time()) if not os.path.exists(directory): os.makedirs(directory) csv_path = os.path.join(directory, "{}.csv".format(timestamp)) with open(csv_path, 'w') as file: file.write(self.to_csv()) print("Saved file to:", csv_path) print("File content:\n", self.to_csv()) @staticmethod def delete_data(directory): if not os.path.exists(directory): return for file in os.listdir(directory): file_path = os.path.join(directory, file) if os.path.isfile(file_path): os.remove(file_path) print("Deleted file: {}".format(file_path)) def push_to_s3(self, s3_config): csv_data = self.to_csv() compressed_csv = self.compress_csv_data(csv_data) now = datetime.now() if now.hour == 0 and now.minute < 30: adjusted_date = now - timedelta(days=1) else: adjusted_date = now s3_path = adjusted_date.strftime("%Y-%m-%d") + ".csv" response = s3_config.create_put_request(s3_path, compressed_csv) if response.status_code != 200: print("ERROR: PUT", response.text) return False print("Successfully uploaded to S3:", s3_path) return True @staticmethod def compress_csv_data(csv_data): memory_stream = io.BytesIO() with zipfile.ZipFile(memory_stream, 'w', zipfile.ZIP_DEFLATED) as archive: archive.writestr("data.csv", csv_data.encode('utf-8')) compressed_bytes = memory_stream.getvalue() return base64.b64encode(compressed_bytes).decode('utf-8') class S3config: def __init__(self, bucket, region, provider, key, secret): self.bucket = bucket self.region = region self.provider = provider self.key = key self.secret = secret self.content_type = "application/base64; charset=utf-8" @property def host(self): return "{}.{}.{}".format(self.bucket, self.region, self.provider) @property def url(self): return "https://{}".format(self.host) def create_put_request(self, s3_path, data): headers = self._create_request_headers("PUT", s3_path) url = "{}/{}".format(self.url, s3_path) response = requests.put(url, headers=headers, data=data) return response def _create_request_headers(self, method, s3_path): date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') auth = self._create_authorization(method, s3_path, date) return { "Host": self.host, "Date": date, "Authorization": auth, "Content-Type": self.content_type } def _create_authorization(self, method, s3_path, date): payload = "{}\n\n{}\n{}\n/{}/{}".format(method, self.content_type, date, self.bucket, s3_path) signature = base64.b64encode( hmac.new(self.secret.encode(), payload.encode(), hashlib.sha1).digest() ).decode() return "AWS {}:{}".format(self.key, signature) class Aggregator: @staticmethod def hourly_data_aggregation_manager(): try: current_time = datetime.now() #next_rounded_minute = (current_time + timedelta(minutes=1)).replace(second=0, microsecond=0) next_rounded_hour = (current_time + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) #time_until_next_minute = (next_rounded_minute - current_time).total_seconds() time_until_next_hour = (next_rounded_hour - current_time).total_seconds() #print("Waiting for {} seconds until the next rounded minute.".format(time_until_next_minute)) print("Waiting for {} seconds until the next rounded hour.".format(time_until_next_hour)) if stop_event.wait(time_until_next_hour): return while not stop_event.is_set(): try: current_time = datetime.now() after_timestamp = datetime_to_timestamp(current_time - timedelta(hours=1)) before_timestamp = datetime_to_timestamp(current_time) aggregated_data = Aggregator.create_hourly_data(CSV_DIR, after_timestamp, before_timestamp) print("save in hourly dir") aggregated_data.save(HOURLY_DIR) except Exception as e: print("An error occurred during hourly data aggregation:", str(e)) if stop_event.wait(3600): # Sleep for 1 hour return except KeyboardInterrupt: print("Hourly data aggregation manager stopped.") @staticmethod def daily_data_aggregation_manager(): try: current_time = datetime.now() #next_rounded_five_minutes = (current_time + timedelta(minutes=5)).replace(second=0, microsecond=0) next_rounded_day = (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) #time_until_next_five_minutes = (next_rounded_five_minutes - current_time).total_seconds() time_until_next_day = (next_rounded_day - current_time).total_seconds() #print("Waiting for {} seconds until the next rounded 5 minutes.".format(time_until_next_five_minutes)) print("Waiting for {} seconds until the next rounded day.".format(time_until_next_day)) if stop_event.wait(time_until_next_day): return while not stop_event.is_set(): try: current_time = datetime.now() after_timestamp = datetime_to_timestamp(current_time - timedelta(days=1)) before_timestamp = datetime_to_timestamp(current_time) aggregated_data = Aggregator.create_daily_data(HOURLY_DIR, after_timestamp, before_timestamp) print("save in daily dir") aggregated_data.save(DAILY_DIR) s3_config = S3config(S3BUCKET, "sos-ch-dk-2", "exo.io", S3KEY, S3SECRET) if aggregated_data.push_to_s3(s3_config): print("delete from hourly dir") AggregatedData.delete_data(HOURLY_DIR) print("delete from daily dir") AggregatedData.delete_data(DAILY_DIR) except Exception as e: print("An error occurred during daily data aggregation:", str(e)) if stop_event.wait(86400): # Sleep for 1 day return except KeyboardInterrupt: print("Daily data aggregation manager stopped.") @staticmethod def create_hourly_data(directory, after_timestamp, before_timestamp): node_data = {} for filename in os.listdir(directory): file_path = os.path.join(directory, filename) if os.path.isfile(file_path) and Aggregator.is_file_within_time_range(filename, after_timestamp, before_timestamp): with open(file_path, 'r') as file: reader = csv.reader(file, delimiter=';') for row in reader: if len(row) >= 2: variable_name, value = row[0], row[1] try: value = float(value) node_number = Aggregator.extract_node_number(variable_name) if node_number not in node_data: node_data[node_number] = {'soc': [], 'discharge': [], 'charge': [], 'heating': []} if "Soc" in variable_name: node_data[node_number]['soc'].append(value) elif "/Dc/Power" in variable_name or "/DischargingBatteryPower" in variable_name or "/ChargingBatteryPower" in variable_name: if value < 0: node_data[node_number]['discharge'].append(value) else: node_data[node_number]['charge'].append(value) elif "/HeatingPower" in variable_name: node_data[node_number]['heating'].append(value) except ValueError: pass if len(node_data) == 0: # No data collected, return default AggregatedData with zeros return AggregatedData(0.0, 0.0, 0.0, 0.0, 0.0) elif len(node_data) == 1: # Directly use the values for a single node for node_number, data in node_data.items(): min_soc = data['soc'][0] if data['soc'] else 0.0 max_soc = data['soc'][0] if data['soc'] else 0.0 avg_discharging_power = data['discharge'][0] if data['discharge'] else 0.0 avg_charging_power = data['charge'][0] if data['charge'] else 0.0 avg_heating_power = data['heating'][0] if data['heating'] else 0.0 return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) else: min_soc = min([min(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0 max_soc = max([max(data['soc']) for data in node_data.values() if data['soc']]) if node_data else 0.0 total_discharging_power = sum([sum(data['discharge']) for data in node_data.values() if data['discharge']]) total_charging_power = sum([sum(data['charge']) for data in node_data.values() if data['charge']]) total_heating_power = sum([sum(data['heating']) for data in node_data.values() if data['heating']]) count_discharging_power = sum([len(data['discharge']) for data in node_data.values() if data['discharge']]) count_charging_power = sum([len(data['charge']) for data in node_data.values() if data['charge']]) count_heating_power = sum([len(data['heating']) for data in node_data.values() if data['heating']]) avg_discharging_power = total_discharging_power / count_discharging_power if count_discharging_power else 0.0 avg_charging_power = total_charging_power / count_charging_power if count_charging_power else 0.0 avg_heating_power = total_heating_power / count_heating_power if count_heating_power else 0.0 return AggregatedData(min_soc, max_soc, avg_discharging_power, avg_charging_power, avg_heating_power) @staticmethod def create_daily_data(directory, after_timestamp, before_timestamp): return Aggregator.create_hourly_data(directory, after_timestamp, before_timestamp) @staticmethod def is_file_within_time_range(filename, start_time, end_time): try: file_timestamp = float(os.path.splitext(filename)[0]) return start_time <= file_timestamp < end_time except ValueError: return False @staticmethod def extract_node_number(variable_name): parts = variable_name.split('/') try: return int(parts[3]) except (IndexError, ValueError): return 0 if __name__ == "__main__": print("aggregator has started") def run_hourly_manager(): Aggregator.hourly_data_aggregation_manager() def run_daily_manager(): Aggregator.daily_data_aggregation_manager() hourly_thread = Thread(target=run_hourly_manager) daily_thread = Thread(target=run_daily_manager) hourly_thread.start() daily_thread.start() try: hourly_thread.join() daily_thread.join() except KeyboardInterrupt: print("Program interrupted. Stopping threads...") stop_event.set() hourly_thread.join() daily_thread.join() print("Program stopped.")