From 533ed9018a75939b293fdf980ea3ebd71ffa0643 Mon Sep 17 00:00:00 2001 From: kostas Date: Thu, 30 May 2024 09:55:40 +0200 Subject: [PATCH] extarct s3 data with decompression --- .../opt/innovenergy/scripts/extractS3data.py | 110 ++++++++---------- 1 file changed, 51 insertions(+), 59 deletions(-) diff --git a/firmware/opt/innovenergy/scripts/extractS3data.py b/firmware/opt/innovenergy/scripts/extractS3data.py index 41cd81f03..6c5c8b924 100644 --- a/firmware/opt/innovenergy/scripts/extractS3data.py +++ b/firmware/opt/innovenergy/scripts/extractS3data.py @@ -4,6 +4,9 @@ import subprocess import argparse import matplotlib.pyplot as plt from collections import defaultdict +import zipfile +import base64 +import shutil def extract_timestamp(filename): timestamp_str = filename[:10] @@ -14,7 +17,6 @@ def extract_timestamp(filename): return 0 def extract_values_by_key(csv_file, key, exact_match): - # Initialize a defaultdict for lists matched_values = defaultdict(list) with open(csv_file, 'r') as file: reader = csv.reader(file) @@ -31,37 +33,26 @@ def extract_values_by_key(csv_file, key, exact_match): else: if key_item.lower() in first_column.lower(): matched_values[path_key].append(row[0]) - #return matched_values - # Concatenate all keys to create a single final_key final_key = ''.join(matched_values.keys()) - # Combine all lists of values into a single list combined_values = [] for values in matched_values.values(): combined_values.extend(values) - # Create the final dictionary with final_key and all combined values final_dict = {final_key: combined_values} - #return dict(matched_values) return final_dict def list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize): filenames_in_range = [f"{timestamp:10d}" for timestamp in range(start_timestamp, end_timestamp + 1, 2*sampling_stepsize)] return filenames_in_range -def check_s3_files_exist(bucket_number, filename): - s3cmd_ls_command = f"s3cmd ls s3://{bucket_number}-3e5b3069-214a-43ee-8d85-57d72000c19d/{filename}*" - try: - result = subprocess.run(s3cmd_ls_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - lines = result.stdout.decode().split('\n')[:-1] - filenames = [line.split()[-1].split('/')[-1] for line in lines] - return filenames - except subprocess.CalledProcessError as e: - print(f"Error checking S3 files: {e}") - return [] - -def download_files(bucket_number, filenames_to_download): +def download_files(bucket_number, filenames_to_download, product_type): + if product_type == 0: + hash = "3e5b3069-214a-43ee-8d85-57d72000c19d" + elif product_type == 1: + hash = "c0436b6a-d276-4cd8-9c44-1eae86cf5d0e" + else: + raise ValueError("Invalid product type option. Use 0 or 1") output_directory = f"S3cmdData_{bucket_number}" - if not os.path.exists(output_directory): os.makedirs(output_directory) print(f"Directory '{output_directory}' created.") @@ -70,7 +61,7 @@ def download_files(bucket_number, filenames_to_download): stripfilename = filename.strip() local_path = os.path.join(output_directory, stripfilename + ".csv") if not os.path.exists(local_path): - s3cmd_command = f"s3cmd get s3://{bucket_number}-3e5b3069-214a-43ee-8d85-57d72000c19d/{stripfilename}.csv {output_directory}/" + s3cmd_command = f"s3cmd get s3://{bucket_number}-{hash}/{stripfilename}.csv {output_directory}/" try: subprocess.run(s3cmd_command, shell=True, check=True) downloaded_files = [file for file in os.listdir(output_directory) if file.startswith(filename)] @@ -84,44 +75,48 @@ def download_files(bucket_number, filenames_to_download): else: print(f"File '{filename}.csv' already exists locally. Skipping download.") +def decompress_file(compressed_file, output_directory): + base_name = os.path.splitext(os.path.basename(compressed_file))[0] -def visualize_data(data, output_directory): - # Extract data for visualization (replace this with your actual data extraction) - x_values = [int(entry[0]) for entry in data] - y_values = [float(entry[1]) for entry in data] + with open(compressed_file, 'rb') as file: + compressed_data = file.read() - # Plotting - plt.plot(x_values, y_values, marker='o', linestyle='-', color='b') - plt.xlabel('Timestamp') - plt.ylabel('Your Y-axis Label') - plt.title('Your Plot Title') - plt.grid(True) - plt.savefig(os.path.join(output_directory, f"{start_timestamp}_{key}_plot.png")) - plt.close() # Close the plot window + # Decode the base64 encoded content + decoded_data = base64.b64decode(compressed_data) + zip_path = os.path.join(output_directory, 'temp.zip') + with open(zip_path, 'wb') as zip_file: + zip_file.write(decoded_data) + + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + zip_ref.extractall(output_directory) + + # Rename the extracted data.csv file to the original timestamp-based name + extracted_csv_path = os.path.join(output_directory, 'data.csv') + if os.path.exists(extracted_csv_path): + new_csv_path = os.path.join(output_directory, f"{base_name}.csv") + os.rename(extracted_csv_path, new_csv_path) + + os.remove(zip_path) + #os.remove(compressed_file) + print(f"Decompressed and renamed '{compressed_file}' to '{new_csv_path}'.") - # Save data to CSV - csv_file_path = os.path.join(output_directory, f"{start_timestamp}_{key}_extracted.csv") - with open(csv_file_path, 'w', newline='') as csvfile: - csv_writer = csv.writer(csvfile) - csv_writer.writerow(['Timestamp', 'Value']) # Adjust column names as needed - csv_writer.writerows(data) def get_last_component(path): path_without_slashes = path.replace('/', '') return path_without_slashes - -def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, key, booleans_as_numbers, exact_match): +def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, key, booleans_as_numbers, exact_match, product_type): output_directory = f"S3cmdData_{bucket_number}" + if os.path.exists(output_directory): + shutil.rmtree(output_directory) + if not os.path.exists(output_directory): os.makedirs(output_directory) print(f"Directory '{output_directory}' created.") - - filenames_to_check = list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize) - #filenames_on_s3 = check_s3_files_exist(bucket_number, filenames_to_check, key) + filenames_to_check = list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize) existing_files = [filename for filename in filenames_to_check if os.path.exists(os.path.join(output_directory, f"{filename}.csv"))] files_to_download = set(filenames_to_check) - set(existing_files) @@ -129,15 +124,20 @@ def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sa print("Files already exist in the local folder. Skipping download.") else: if files_to_download: - download_files(bucket_number, files_to_download) + download_files(bucket_number, files_to_download, product_type) + # Decompress all downloaded .csv files (which are actually compressed) + compressed_files = [os.path.join(output_directory, file) for file in os.listdir(output_directory) if file.endswith('.csv')] + for compressed_file in compressed_files: + decompress_file(compressed_file, output_directory) - # Process CSV files csv_files = [file for file in os.listdir(output_directory) if file.endswith('.csv')] csv_files.sort(key=extract_timestamp) + + keypath = '' for key_item in key: - keypath+= get_last_component(key_item) + keypath += get_last_component(key_item) output_csv_filename = f"{keypath}_{start_timestamp}_{bucket_number}.csv" with open(output_csv_filename, 'w', newline='') as csvfile: csv_writer = csv.writer(csvfile) @@ -171,42 +171,34 @@ def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sa print(f"Extracted data saved in '{output_csv_filename}'.") def parse_keys(input_string): - # Split the input string by commas and strip whitespace keys = [key.strip() for key in input_string.split(',')] - # Return keys as a list if more than one, else return the single key - #return keys if len(keys) > 1 else keys[0] return keys - def main(): parser = argparse.ArgumentParser(description='Download files from S3 using s3cmd and extract specific values from CSV files.') parser.add_argument('start_timestamp', type=int, help='The start timestamp for the range (even number)') parser.add_argument('end_timestamp', type=int, help='The end timestamp for the range (even number)') - #parser.add_argument('--key', type=str, required=True, help='The part to match from each CSV file') parser.add_argument('--keys', type=parse_keys, required=True, help='The part to match from each CSV file, can be a single key or a comma-separated list of keys') parser.add_argument('--bucket-number', type=int, required=True, help='The number of the bucket to download from') parser.add_argument('--sampling_stepsize', type=int, required=False, default=1, help='The number of 2sec intervals, which define the length of the sampling interval in S3 file retrieval') parser.add_argument('--booleans_as_numbers', action="store_true", required=False, help='If key used, then booleans are converted to numbers [0/1], if key not used, then booleans maintained as text [False/True]') parser.add_argument('--exact_match', action="store_true", required=False, help='If key used, then key has to match exactly "=", else it is enough that key is found "in" text') + parser.add_argument('--product_type', required=True, help='Use 0 for Salimax and 1 for Salidomo') - - args = parser.parse_args(); + args = parser.parse_args() start_timestamp = args.start_timestamp end_timestamp = args.end_timestamp keys = args.keys bucket_number = args.bucket_number sampling_stepsize = args.sampling_stepsize booleans_as_numbers = args.booleans_as_numbers - exact_match = args.exact_match + exact_match = args.exact_match + product_type = int(args.product_type) - - - # Check if start_timestamp is smaller than end_timestamp if start_timestamp >= end_timestamp: print("Error: start_timestamp must be smaller than end_timestamp.") return - download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, exact_match) + download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, exact_match, product_type) if __name__ == "__main__": main() -