add: batch files

This commit is contained in:
kostas 2024-06-26 15:53:19 +02:00
parent c2aad6783b
commit 91a23d3ecb
6 changed files with 165 additions and 54 deletions

View File

@ -6,7 +6,7 @@ namespace InnovEnergy.App.SchneiderDriver;
public static class Config public static class Config
{ {
public const String Version = "1.0"; public const String Version = "1.0";
public const String BusName = "com.victronenergy.grid"; public const String BusName = "com.victronenergy.grid.Schneider";
public const Byte ModbusNodeId = 1; public const Byte ModbusNodeId = 1;
public const String OwnAddress = "192.168.1.246"; public const String OwnAddress = "192.168.1.246";
public const String PeerAddress = "192.168.1.82"; public const String PeerAddress = "192.168.1.82";
@ -52,13 +52,13 @@ public static class Config
{ {
new("/ProductName" , "Grid meter" ), new("/ProductName" , "Grid meter" ),
new("/CustomName" , "Schneider Professional"), new("/CustomName" , "Schneider Professional"),
new("/DeviceInstance" , 30), new("/DeviceInstance" , 50),
new("/DeviceType" , 72), new("/DeviceType" , 73),
new("/Mgmt/Connection" , "Modbus TCP"), new("/Mgmt/Connection" , "Modbus TCP"),
new("/Mgmt/ProcessName" , Assembly.GetEntryAssembly()?.Location ?? "unknown"), new("/Mgmt/ProcessName" , Assembly.GetEntryAssembly()?.Location ?? "unknown"),
new("/Mgmt/ProcessVersion", Version), new("/Mgmt/ProcessVersion", Version),
new("/Connected" , 1), new("/Connected" , 1),
new("/ProductId" , 45058, "b002"), new("/ProductId" , 45139, "b002"),
new("/Role" , "grid"), new("/Role" , "grid"),
}; };

View File

@ -30,21 +30,7 @@ public static class SchneiderMeterDriver
.Publish(); .Publish();
var x = schneider.Read(); var x = schneider.Read();
// Print the output of schneider.Read()
if (x != null)
{
Console.WriteLine("Schneider Read Output:");
Console.WriteLine($"ActivePowerL1: {x.ActivePowerL1}");
Console.WriteLine($"ActivePowerL2: {x.ActivePowerL2}");
Console.WriteLine($"ActivePowerL3: {x.ActivePowerL3}");
// Add more properties if needed
}
else
{
Console.WriteLine("Failed to read data from Schneider device.");
}
var poller = schneiderStatus.Connect(); var poller = schneiderStatus.Connect();
var properties = Config.DefaultProperties; var properties = Config.DefaultProperties;

View File

@ -8,7 +8,7 @@ platform="linux-arm"
netVersion="net6.0" netVersion="net6.0"
config="Release" config="Release"
host="root@$remote" host="root@$remote"
dir="/opt/innovenergy/$exe" dir="/opt/victronenergy/$exe"
set -e set -e

View File

@ -2,7 +2,7 @@
csproj="SchneiderMeterDriver.csproj" csproj="SchneiderMeterDriver.csproj"
exe="SchneiderMeterDriver" exe="SchneiderMeterDriver"
remote="10.2.4.155" remote="10.2.4.114"
platform="linux-arm" platform="linux-arm"
netVersion="net6.0" netVersion="net6.0"
config="Release" config="Release"

View File

@ -14,7 +14,40 @@ public partial class Battery250UpRecord
{ {
[InputRegister(1004)] private UInt16 _LedStates; [InputRegister(1004)] private UInt16 _LedStates;
[InputRegister<UInt64>(1005)] private UInt64 _WarningFlags; [InputRegister<UInt64>(1005)] private UInt64 _WarningFlags;
//mine
[InputRegister<UInt64>(1006)] private UInt64 _WarningFlags16to31;
[InputRegister<UInt64>(1007)] private UInt64 _WarningFlags32to47;
[InputRegister<UInt64>(1008)] private UInt64 _WarningFlags48to63;
[InputRegister<UInt64>(1009)] private UInt64 _AlarmFlags; [InputRegister<UInt64>(1009)] private UInt64 _AlarmFlags;
//mine
[InputRegister<UInt64>(1010)] private UInt64 _AlarmFlags16to31;
[InputRegister<UInt64>(1011)] private UInt64 _AlarmFlags32to47;
[InputRegister<UInt64>(1012)] private UInt64 _AlarmFlags48to63;
[InputRegister(1013)] private UInt16 _IoStates; [InputRegister(1013)] private UInt16 _IoStates;
[InputRegister(999, Scale = 0.01)] private Double _CellsVoltage; [InputRegister(999, Scale = 0.01)] private Double _CellsVoltage;
@ -43,7 +76,18 @@ public partial class Battery250UpRecord
[InputRegister(1060)] private UInt16 _BatteryState1; [InputRegister(1060)] private UInt16 _BatteryState1;
[InputRegister(1061)] private UInt16 _BatteryState2; [InputRegister(1061)] private UInt16 _BatteryState2;
//[InputRegister(1063)] private UInt16 _TotalBatteryCycle;
//mine
[InputRegister(1018, Scale = 0.1)] private Double _RiscC_pwm;
[InputRegister(1019, Scale = 0.1)] private Double _RiscL_pwm;
[InputRegister(1050)] private UInt16 _RTCCounterLo;
[InputRegister(1051)] private UInt16 _RTCCounterHi;
[InputRegister(1052)] private UInt16 _TimeToTocRequest;
//[InputRegister(1063)] private UInt16 _TotalBatteryCycle;
private LedState ParseLed(LedColor led) => (LedState)((_LedStates >> (Int32)led) & 3); private LedState ParseLed(LedColor led) => (LedState)((_LedStates >> (Int32)led) & 3);

View File

@ -50,9 +50,10 @@ import time
# zip-comp additions # zip-comp additions
import zipfile import zipfile
import io import io
import shutil
def compress_csv_data(csv_data, file_name="data.csv"): def compress_csv_data(csv_data, file_name="data.csv"):
memory_stream = io.BytesIO() memory_stream = io.BytesIO()
# Create a zip archive in the memory buffer # Create a zip archive in the memory buffer
@ -379,8 +380,8 @@ def init_signals(hardware_version, firmware_version, n_batteries):
as described in the document 'T48TLxxx ModBus Protocol Rev.7.1' which can as described in the document 'T48TLxxx ModBus Protocol Rev.7.1' which can
be found in the /doc folder be found in the /doc folder
""" """
product_id = cfg.PRODUCT_ID
product_id_hex = '0x{0:04x}'.format(cfg.PRODUCT_ID) product_id_hex = '0x{0:04product_id}'
read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2) read_voltage = c.read_float(register=999, scale_factor=0.01, offset=0, places=2)
read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2) read_current = c.read_float(register=1000, scale_factor=0.01, offset=-10000, places=2)
@ -444,7 +445,7 @@ def init_signals(hardware_version, firmware_version, n_batteries):
Signal('/Soc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')), Signal('/Soc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')),
Signal('/LowestSoc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')), Signal('/LowestSoc', min, c.read_float(register=1053, scale_factor=0.1, offset=0, places=1), c.append_unit('%')),
Signal('/Dc/0/Temperature', c.mean, c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), c.append_unit(u'°C')), Signal('/Dc/0/Temperature', c.mean, c.read_float(register=1003, scale_factor=0.1, offset=-400, places=1), c.append_unit(u'°C')),
# Charge/Discharge current, voltage and power # Charge/Discharge current, voltage and power
Signal('/Info/MaxDischargeCurrent', c.ssum, max_discharge_current,c.append_unit('A')), Signal('/Info/MaxDischargeCurrent', c.ssum, max_discharge_current,c.append_unit('A')),
Signal('/Info/MaxChargeCurrent', c.ssum, max_charge_current, c.append_unit('A')), Signal('/Info/MaxChargeCurrent', c.ssum, max_charge_current, c.append_unit('A')),
@ -745,7 +746,7 @@ def update_state_from_dictionaries(current_warnings, current_alarms, node_number
if int(list(current_alarms.keys())[i].split("/")[3]) == int(node_number): if int(list(current_alarms.keys())[i].split("/")[3]) == int(node_number):
if alarm_value: if alarm_value:
cnt+=1 cnt+=1
alarms_number_list.append(cnt) alarms_number_list.append(cnt)
warnings_number_list = [] warnings_number_list = []
@ -898,6 +899,107 @@ def parse_cmdline_args(argv):
sys.exit(1) sys.exit(1)
return argv[0] return argv[0]
def count_files_in_folder(folder_path):
try:
# List all files in the folder
files = os.listdir(folder_path)
# Filter out directories, only count files
num_files = sum(1 for f in files if os.path.isfile(os.path.join(folder_path, f)))
return num_files
except FileNotFoundError:
return "Folder not found"
except Exception as e:
return str(e)
def create_batch_of_csv_files():
# list all files in the directory
files = os.listdir(CSV_DIR)
# filter out only csv files
csv_files = [file for file in files if file.endswith('.csv')]
# sort csv files by creation time
csv_files.sort(key=lambda x: os.path.getctime(os.path.join(CSV_DIR, x)))
# keep the 15 MOST RECENT FILES
recent_csv_files = csv_files[-15:] if len(csv_files) > 15 else csv_files
# get the name of the first csv file
if not csv_files:
print("No csv files found in the directory.")
exit(0)
first_csv_file = os.path.join(CSV_DIR, recent_csv_files.pop(0))
first_csv_filename = os.path.basename(first_csv_file)
temp_file_path = os.path.join(CSV_DIR, 'temp_batch_file.csv')
# create a temporary file and write the timestamp and the original content of the first file
with open(temp_file_path, 'wb') as temp_file:
# Write the timestamp (filename) at the beginning
numeric_part = first_csv_filename.split('.')[0]
temp_file.write(f'Timestamp;{numeric_part}\n'.encode('utf-8'))
# write the original content of the first csv file
with open(first_csv_file, 'rb') as f:
temp_file.write(f.read())
for csv_file in recent_csv_files:
file_path = os.path.join(CSV_DIR, csv_file)
# write an empty line
temp_file.write(b'\n')
# write the timestamp (filename)
numeric_part = csv_file.split('.')[0]
temp_file.write(f'Timestamp;{numeric_part}\n'.encode('utf-8'))
# write the content of the file
with open(file_path, 'rb') as f:
temp_file.write(f.read())
# replace the original first csv file with the temporary file
os.remove(first_csv_file)
os.rename(temp_file_path, first_csv_file)
# create a loggin directory that contains at max 20 batch files for logging info
logging_dir = os.path.join(CSV_DIR, 'logging_batch_files')
if not os.path.exists(logging_dir):
os.makedirs(logging_dir)
shutil.copy(first_csv_file, logging_dir)
manage_csv_files(logging_dir)
# keep at most 100 files at CSV_DIR for logging
manage_csv_files(CSV_DIR, 100)
# prepare for compression
csv_data = read_csv_as_string(first_csv_file)
if csv_data is None:
print("error while reading csv as string")
return
# zip-comp additions
compressed_csv = compress_csv_data(csv_data)
# Use the name of the last (most recent) CSV file in sorted csv_files as the name for the compressed file
last_csv_file_name = os.path.basename(recent_csv_files[-1]) if recent_csv_files else first_csv_filename
numeric_part = int(last_csv_file_name.split('.')[0])
compressed_filename = "{}.csv".format(numeric_part)
response = s3_config.create_put_request(compressed_filename, compressed_csv)
if response.status_code == 200:
os.remove(first_csv_file)
print("Successfully uploaded the compresseed batch of files in s3")
else:
# we save data that were not successfully uploaded in s3 in a failed directory inside the CSV_DIR for logging
failed_dir = os.path.join(CSV_DIR, "failed")
if not os.path.exists(failed_dir):
os.makedirs(failed_dir)
failed_path = os.path.join(failed_dir, first_csv_filename)
os.rename(first_csv_file, failed_path)
print("Uploading failed")
manage_csv_files(failed_dir, 100)
alive = True # global alive flag, watchdog_task clears it, update_task sets it alive = True # global alive flag, watchdog_task clears it, update_task sets it
ALLOW = False ALLOW = False
@ -907,14 +1009,20 @@ def create_update_task(modbus, dbus, batteries, signals, csv_signals, main_loop)
Creates an update task which runs the main update function Creates an update task which runs the main update function
and resets the alive flag and resets the alive flag
""" """
start_time = time.time()
def update_task(): def update_task():
# type: () -> bool # type: () -> bool
nonlocal start_time
global alive, ALLOW global alive, ALLOW
if ALLOW: if ALLOW:
ALLOW = False ALLOW = False
else: else:
ALLOW = True ALLOW = True
alive = update(modbus, batteries, dbus, signals, csv_signals) alive = update(modbus, batteries, dbus, signals, csv_signals)
elapsed_time = time.time() - start_time
if count_files_in_folder(CSV_DIR) >= 15 and elapsed_time >= 30:
create_batch_of_csv_files()
start_time = time.time()
#alive = update_for_testing(modbus, batteries, dbus, signals, csv_signals) #alive = update_for_testing(modbus, batteries, dbus, signals, csv_signals)
if not alive: if not alive:
logging.info('update_task: quitting main loop because of error') logging.info('update_task: quitting main loop because of error')
@ -992,33 +1100,6 @@ def create_csv_files(signals, statuses, node_numbers, alarms_number_list, warnin
value = s.get_value(statuses[i]) value = s.get_value(statuses[i])
row_values = [signal_name, value, s.get_text] row_values = [signal_name, value, s.get_text]
csv_writer.writerow(row_values) csv_writer.writerow(row_values)
# Manage CSV files, keep a limited number of files
# Create the CSV as a string
csv_data = read_csv_as_string(csv_path)
if csv_data is None:
print(" error while reading csv as string")
return
# zip-comp additions
compressed_csv = compress_csv_data(csv_data)
compressed_filename = f"{timestamp}.csv"
response = s3_config.create_put_request(compressed_filename, compressed_csv)
if response.status_code == 200:
#os.remove(csv_path)
print("Success")
else:
failed_dir = os.path.join(CSV_DIR, "failed")
if not os.path.exists(failed_dir):
os.makedirs(failed_dir)
failed_path = os.path.join(failed_dir, csv_filename)
os.rename(csv_path, failed_path)
print("Uploading failed")
manage_csv_files(failed_dir, 10)
manage_csv_files(CSV_DIR)
def main(argv): def main(argv):
# type: (list[str]) -> () # type: (list[str]) -> ()