extarct s3 data with decompression
This commit is contained in:
parent
41318805e5
commit
533ed9018a
|
@ -4,6 +4,9 @@ import subprocess
|
||||||
import argparse
|
import argparse
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import zipfile
|
||||||
|
import base64
|
||||||
|
import shutil
|
||||||
|
|
||||||
def extract_timestamp(filename):
|
def extract_timestamp(filename):
|
||||||
timestamp_str = filename[:10]
|
timestamp_str = filename[:10]
|
||||||
|
@ -14,7 +17,6 @@ def extract_timestamp(filename):
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def extract_values_by_key(csv_file, key, exact_match):
|
def extract_values_by_key(csv_file, key, exact_match):
|
||||||
# Initialize a defaultdict for lists
|
|
||||||
matched_values = defaultdict(list)
|
matched_values = defaultdict(list)
|
||||||
with open(csv_file, 'r') as file:
|
with open(csv_file, 'r') as file:
|
||||||
reader = csv.reader(file)
|
reader = csv.reader(file)
|
||||||
|
@ -31,37 +33,26 @@ def extract_values_by_key(csv_file, key, exact_match):
|
||||||
else:
|
else:
|
||||||
if key_item.lower() in first_column.lower():
|
if key_item.lower() in first_column.lower():
|
||||||
matched_values[path_key].append(row[0])
|
matched_values[path_key].append(row[0])
|
||||||
#return matched_values
|
|
||||||
# Concatenate all keys to create a single final_key
|
|
||||||
final_key = ''.join(matched_values.keys())
|
final_key = ''.join(matched_values.keys())
|
||||||
# Combine all lists of values into a single list
|
|
||||||
combined_values = []
|
combined_values = []
|
||||||
for values in matched_values.values():
|
for values in matched_values.values():
|
||||||
combined_values.extend(values)
|
combined_values.extend(values)
|
||||||
# Create the final dictionary with final_key and all combined values
|
|
||||||
final_dict = {final_key: combined_values}
|
final_dict = {final_key: combined_values}
|
||||||
#return dict(matched_values)
|
|
||||||
return final_dict
|
return final_dict
|
||||||
|
|
||||||
def list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize):
|
def list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize):
|
||||||
filenames_in_range = [f"{timestamp:10d}" for timestamp in range(start_timestamp, end_timestamp + 1, 2*sampling_stepsize)]
|
filenames_in_range = [f"{timestamp:10d}" for timestamp in range(start_timestamp, end_timestamp + 1, 2*sampling_stepsize)]
|
||||||
return filenames_in_range
|
return filenames_in_range
|
||||||
|
|
||||||
def check_s3_files_exist(bucket_number, filename):
|
def download_files(bucket_number, filenames_to_download, product_type):
|
||||||
s3cmd_ls_command = f"s3cmd ls s3://{bucket_number}-3e5b3069-214a-43ee-8d85-57d72000c19d/{filename}*"
|
if product_type == 0:
|
||||||
try:
|
hash = "3e5b3069-214a-43ee-8d85-57d72000c19d"
|
||||||
result = subprocess.run(s3cmd_ls_command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
elif product_type == 1:
|
||||||
lines = result.stdout.decode().split('\n')[:-1]
|
hash = "c0436b6a-d276-4cd8-9c44-1eae86cf5d0e"
|
||||||
filenames = [line.split()[-1].split('/')[-1] for line in lines]
|
else:
|
||||||
return filenames
|
raise ValueError("Invalid product type option. Use 0 or 1")
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
print(f"Error checking S3 files: {e}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def download_files(bucket_number, filenames_to_download):
|
|
||||||
output_directory = f"S3cmdData_{bucket_number}"
|
output_directory = f"S3cmdData_{bucket_number}"
|
||||||
|
|
||||||
|
|
||||||
if not os.path.exists(output_directory):
|
if not os.path.exists(output_directory):
|
||||||
os.makedirs(output_directory)
|
os.makedirs(output_directory)
|
||||||
print(f"Directory '{output_directory}' created.")
|
print(f"Directory '{output_directory}' created.")
|
||||||
|
@ -70,7 +61,7 @@ def download_files(bucket_number, filenames_to_download):
|
||||||
stripfilename = filename.strip()
|
stripfilename = filename.strip()
|
||||||
local_path = os.path.join(output_directory, stripfilename + ".csv")
|
local_path = os.path.join(output_directory, stripfilename + ".csv")
|
||||||
if not os.path.exists(local_path):
|
if not os.path.exists(local_path):
|
||||||
s3cmd_command = f"s3cmd get s3://{bucket_number}-3e5b3069-214a-43ee-8d85-57d72000c19d/{stripfilename}.csv {output_directory}/"
|
s3cmd_command = f"s3cmd get s3://{bucket_number}-{hash}/{stripfilename}.csv {output_directory}/"
|
||||||
try:
|
try:
|
||||||
subprocess.run(s3cmd_command, shell=True, check=True)
|
subprocess.run(s3cmd_command, shell=True, check=True)
|
||||||
downloaded_files = [file for file in os.listdir(output_directory) if file.startswith(filename)]
|
downloaded_files = [file for file in os.listdir(output_directory) if file.startswith(filename)]
|
||||||
|
@ -84,44 +75,48 @@ def download_files(bucket_number, filenames_to_download):
|
||||||
else:
|
else:
|
||||||
print(f"File '{filename}.csv' already exists locally. Skipping download.")
|
print(f"File '{filename}.csv' already exists locally. Skipping download.")
|
||||||
|
|
||||||
|
def decompress_file(compressed_file, output_directory):
|
||||||
|
base_name = os.path.splitext(os.path.basename(compressed_file))[0]
|
||||||
|
|
||||||
def visualize_data(data, output_directory):
|
with open(compressed_file, 'rb') as file:
|
||||||
# Extract data for visualization (replace this with your actual data extraction)
|
compressed_data = file.read()
|
||||||
x_values = [int(entry[0]) for entry in data]
|
|
||||||
y_values = [float(entry[1]) for entry in data]
|
|
||||||
|
|
||||||
# Plotting
|
# Decode the base64 encoded content
|
||||||
plt.plot(x_values, y_values, marker='o', linestyle='-', color='b')
|
decoded_data = base64.b64decode(compressed_data)
|
||||||
plt.xlabel('Timestamp')
|
|
||||||
plt.ylabel('Your Y-axis Label')
|
|
||||||
plt.title('Your Plot Title')
|
|
||||||
plt.grid(True)
|
|
||||||
plt.savefig(os.path.join(output_directory, f"{start_timestamp}_{key}_plot.png"))
|
|
||||||
plt.close() # Close the plot window
|
|
||||||
|
|
||||||
|
zip_path = os.path.join(output_directory, 'temp.zip')
|
||||||
|
with open(zip_path, 'wb') as zip_file:
|
||||||
|
zip_file.write(decoded_data)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||||||
|
zip_ref.extractall(output_directory)
|
||||||
|
|
||||||
|
# Rename the extracted data.csv file to the original timestamp-based name
|
||||||
|
extracted_csv_path = os.path.join(output_directory, 'data.csv')
|
||||||
|
if os.path.exists(extracted_csv_path):
|
||||||
|
new_csv_path = os.path.join(output_directory, f"{base_name}.csv")
|
||||||
|
os.rename(extracted_csv_path, new_csv_path)
|
||||||
|
|
||||||
|
os.remove(zip_path)
|
||||||
|
#os.remove(compressed_file)
|
||||||
|
print(f"Decompressed and renamed '{compressed_file}' to '{new_csv_path}'.")
|
||||||
|
|
||||||
# Save data to CSV
|
|
||||||
csv_file_path = os.path.join(output_directory, f"{start_timestamp}_{key}_extracted.csv")
|
|
||||||
with open(csv_file_path, 'w', newline='') as csvfile:
|
|
||||||
csv_writer = csv.writer(csvfile)
|
|
||||||
csv_writer.writerow(['Timestamp', 'Value']) # Adjust column names as needed
|
|
||||||
csv_writer.writerows(data)
|
|
||||||
|
|
||||||
def get_last_component(path):
|
def get_last_component(path):
|
||||||
path_without_slashes = path.replace('/', '')
|
path_without_slashes = path.replace('/', '')
|
||||||
return path_without_slashes
|
return path_without_slashes
|
||||||
|
|
||||||
|
def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, key, booleans_as_numbers, exact_match, product_type):
|
||||||
def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, key, booleans_as_numbers, exact_match):
|
|
||||||
output_directory = f"S3cmdData_{bucket_number}"
|
output_directory = f"S3cmdData_{bucket_number}"
|
||||||
|
|
||||||
|
if os.path.exists(output_directory):
|
||||||
|
shutil.rmtree(output_directory)
|
||||||
|
|
||||||
if not os.path.exists(output_directory):
|
if not os.path.exists(output_directory):
|
||||||
os.makedirs(output_directory)
|
os.makedirs(output_directory)
|
||||||
print(f"Directory '{output_directory}' created.")
|
print(f"Directory '{output_directory}' created.")
|
||||||
|
|
||||||
filenames_to_check = list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize)
|
filenames_to_check = list_files_in_range(start_timestamp, end_timestamp, sampling_stepsize)
|
||||||
#filenames_on_s3 = check_s3_files_exist(bucket_number, filenames_to_check, key)
|
|
||||||
|
|
||||||
existing_files = [filename for filename in filenames_to_check if os.path.exists(os.path.join(output_directory, f"{filename}.csv"))]
|
existing_files = [filename for filename in filenames_to_check if os.path.exists(os.path.join(output_directory, f"{filename}.csv"))]
|
||||||
files_to_download = set(filenames_to_check) - set(existing_files)
|
files_to_download = set(filenames_to_check) - set(existing_files)
|
||||||
|
|
||||||
|
@ -129,15 +124,20 @@ def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sa
|
||||||
print("Files already exist in the local folder. Skipping download.")
|
print("Files already exist in the local folder. Skipping download.")
|
||||||
else:
|
else:
|
||||||
if files_to_download:
|
if files_to_download:
|
||||||
download_files(bucket_number, files_to_download)
|
download_files(bucket_number, files_to_download, product_type)
|
||||||
|
|
||||||
|
# Decompress all downloaded .csv files (which are actually compressed)
|
||||||
|
compressed_files = [os.path.join(output_directory, file) for file in os.listdir(output_directory) if file.endswith('.csv')]
|
||||||
|
for compressed_file in compressed_files:
|
||||||
|
decompress_file(compressed_file, output_directory)
|
||||||
|
|
||||||
# Process CSV files
|
|
||||||
csv_files = [file for file in os.listdir(output_directory) if file.endswith('.csv')]
|
csv_files = [file for file in os.listdir(output_directory) if file.endswith('.csv')]
|
||||||
csv_files.sort(key=extract_timestamp)
|
csv_files.sort(key=extract_timestamp)
|
||||||
|
|
||||||
|
|
||||||
keypath = ''
|
keypath = ''
|
||||||
for key_item in key:
|
for key_item in key:
|
||||||
keypath+= get_last_component(key_item)
|
keypath += get_last_component(key_item)
|
||||||
output_csv_filename = f"{keypath}_{start_timestamp}_{bucket_number}.csv"
|
output_csv_filename = f"{keypath}_{start_timestamp}_{bucket_number}.csv"
|
||||||
with open(output_csv_filename, 'w', newline='') as csvfile:
|
with open(output_csv_filename, 'w', newline='') as csvfile:
|
||||||
csv_writer = csv.writer(csvfile)
|
csv_writer = csv.writer(csvfile)
|
||||||
|
@ -171,42 +171,34 @@ def download_and_process_files(bucket_number, start_timestamp, end_timestamp, sa
|
||||||
print(f"Extracted data saved in '{output_csv_filename}'.")
|
print(f"Extracted data saved in '{output_csv_filename}'.")
|
||||||
|
|
||||||
def parse_keys(input_string):
|
def parse_keys(input_string):
|
||||||
# Split the input string by commas and strip whitespace
|
|
||||||
keys = [key.strip() for key in input_string.split(',')]
|
keys = [key.strip() for key in input_string.split(',')]
|
||||||
# Return keys as a list if more than one, else return the single key
|
|
||||||
#return keys if len(keys) > 1 else keys[0]
|
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description='Download files from S3 using s3cmd and extract specific values from CSV files.')
|
parser = argparse.ArgumentParser(description='Download files from S3 using s3cmd and extract specific values from CSV files.')
|
||||||
parser.add_argument('start_timestamp', type=int, help='The start timestamp for the range (even number)')
|
parser.add_argument('start_timestamp', type=int, help='The start timestamp for the range (even number)')
|
||||||
parser.add_argument('end_timestamp', type=int, help='The end timestamp for the range (even number)')
|
parser.add_argument('end_timestamp', type=int, help='The end timestamp for the range (even number)')
|
||||||
#parser.add_argument('--key', type=str, required=True, help='The part to match from each CSV file')
|
|
||||||
parser.add_argument('--keys', type=parse_keys, required=True, help='The part to match from each CSV file, can be a single key or a comma-separated list of keys')
|
parser.add_argument('--keys', type=parse_keys, required=True, help='The part to match from each CSV file, can be a single key or a comma-separated list of keys')
|
||||||
parser.add_argument('--bucket-number', type=int, required=True, help='The number of the bucket to download from')
|
parser.add_argument('--bucket-number', type=int, required=True, help='The number of the bucket to download from')
|
||||||
parser.add_argument('--sampling_stepsize', type=int, required=False, default=1, help='The number of 2sec intervals, which define the length of the sampling interval in S3 file retrieval')
|
parser.add_argument('--sampling_stepsize', type=int, required=False, default=1, help='The number of 2sec intervals, which define the length of the sampling interval in S3 file retrieval')
|
||||||
parser.add_argument('--booleans_as_numbers', action="store_true", required=False, help='If key used, then booleans are converted to numbers [0/1], if key not used, then booleans maintained as text [False/True]')
|
parser.add_argument('--booleans_as_numbers', action="store_true", required=False, help='If key used, then booleans are converted to numbers [0/1], if key not used, then booleans maintained as text [False/True]')
|
||||||
parser.add_argument('--exact_match', action="store_true", required=False, help='If key used, then key has to match exactly "=", else it is enough that key is found "in" text')
|
parser.add_argument('--exact_match', action="store_true", required=False, help='If key used, then key has to match exactly "=", else it is enough that key is found "in" text')
|
||||||
|
parser.add_argument('--product_type', required=True, help='Use 0 for Salimax and 1 for Salidomo')
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
args = parser.parse_args();
|
|
||||||
start_timestamp = args.start_timestamp
|
start_timestamp = args.start_timestamp
|
||||||
end_timestamp = args.end_timestamp
|
end_timestamp = args.end_timestamp
|
||||||
keys = args.keys
|
keys = args.keys
|
||||||
bucket_number = args.bucket_number
|
bucket_number = args.bucket_number
|
||||||
sampling_stepsize = args.sampling_stepsize
|
sampling_stepsize = args.sampling_stepsize
|
||||||
booleans_as_numbers = args.booleans_as_numbers
|
booleans_as_numbers = args.booleans_as_numbers
|
||||||
exact_match = args.exact_match
|
exact_match = args.exact_match
|
||||||
|
product_type = int(args.product_type)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Check if start_timestamp is smaller than end_timestamp
|
|
||||||
if start_timestamp >= end_timestamp:
|
if start_timestamp >= end_timestamp:
|
||||||
print("Error: start_timestamp must be smaller than end_timestamp.")
|
print("Error: start_timestamp must be smaller than end_timestamp.")
|
||||||
return
|
return
|
||||||
download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, exact_match)
|
download_and_process_files(bucket_number, start_timestamp, end_timestamp, sampling_stepsize, keys, booleans_as_numbers, exact_match, product_type)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue