import serial import csv TELECOMMAND_FILE_PATH = "Telecommand_Return_Record.csv" # Table 3 CID1_DEVICE_CODES = { "46": "Lithium iron phosphate battery BMS", } # Table 4 CID2_COMMAND_CODES = { "42": "Acquisition of telemetering information", "44": "Acquisition of telecommand information", "45": "Telecontrol command", "47": "Acquisition of teleregulation information", "49": "Setting of teleregulation information", "4F": "Acquisition of the communication protocol version number", "51": "Acquisition of device vendor information", "4B": "Acquisition of historical data", "4D": "Acquisition time", "4E": "Synchronization time", "A0": "Production calibration", "A1": "Production setting", "A2": "Regular recording" } # Table 5 CID2_RETURN_CODES = { "00": "Normal", "01": "VER error", "02": "CHKSUM error", "03": "LCHKSUM error", "04": "CID2 invalid", "05": "Command format error", "06": "Data invalid (parameter setting)", "07": "No data (history)", "E1": "CID1 invalid", "E2": "Command execution failure", "E3": "Device fault", "E4": "Invalid permissions" } # Table 12 BYTE_ALARM_CODES = { "00": "Normal, no alarm", "01": "Alarm that analog quantity reaches the lower limit", "02": "Alarm that analog quantity reaches the upper limit", "F0": "Other alarms" } # Table 13 BIT_ALARM_CODES = { "Alarm event 1": ( "Voltage sensor fault", "Temperature sensor fault", "Current sensor fault", "Key switch fault", "Cell voltage dropout fault", "Charge switch fault", "Discharge switch fault", "Current limit switch fault" ), "Alarm event 2": ( "Monomer high voltage alarm", "Monomer overvoltage protection", "Monomer low voltage alarm", "Monomer under voltage protection", "High voltage alarm for total voltage", "Overvoltage protection for total voltage", "Low voltage alarm for total voltage", "Under voltage protection for total voltage" ), "Alarm event 3": ( "Charge high temperature alarm", "Charge over temperature protection", "Charge low temperature alarm", "Charge under temperature protection", "Discharge high temperature alarm", "Discharge over temperature protection", "Discharge low temperature alarm", "Discharge under temperature protection" ), "Alarm event 4": ( "Environment high temperature alarm", "Environment over temperature protection", "Environment low temperature alarm", "Environment under temperature protection", "Power over temperature protection", "Power high temperature alarm", "Cell low temperature heating", "Reservation bit" ), "Alarm event 5": ( "Charge over current alarm", "Charge over current protection", "Discharge over current alarm", "Discharge over current protection", "Transient over current protection", "Output short circuit protection", "Transient over current lockout", "Output short circuit lockout" ), "Alarm event 6": ( "Charge high voltage protection", "Intermittent recharge waiting", "Residual capacity alarm", "Residual capacity protection", "Cell low voltage charging prohibition", "Output reverse polarity protection", "Output connection fault", "Inside bit" ), "On-off state": ( "Discharge switch state", "Charge switch state", "Current limit switch state", "Heating switch state", "Reservation bit", "Reservation bit", "Reservation bit", "Reservation bit" ), "Equilibrium state 1": ( "Cell 01 equilibrium", "Cell 02 equilibrium", "Cell 03 equilibrium", "Cell 04 equilibrium", "Cell 05 equilibrium", "Cell 06 equilibrium", "Cell 07 equilibrium", "Cell 08 equilibrium" ), "Equilibrium state 2": ( "Cell 09 equilibrium", "Cell 10 equilibrium", "Cell 11 equilibrium", "Cell 12 equilibrium", "Cell 13 equilibrium", "Cell 14 equilibrium", "Cell 15 equilibrium", "Cell 16 equilibrium" ), "System state": ( "Discharge", "Charge", "Floating charge", "Reservation bit", "Standby", "Shutdown", "Reservation bit", "Reservation bit" ), "Disconnection state 1": ( "Cell 01 disconnection", "Cell 02 disconnection", "Cell 03 disconnection", "Cell 04 disconnection", "Cell 05 disconnection", "Cell 06 disconnection", "Cell 07 disconnection", "Cell 08 disconnection" ), "Disconnection state 2": ( "Cell 09 disconnection", "Cell 10 disconnection", "Cell 11 disconnection", "Cell 12 disconnection", "Cell 13 disconnection", "Cell 14 disconnection", "Cell 15 disconnection", "Cell 16 disconnection" ), "Alarm event 7": ( "Inside bit", "Inside bit", "Inside bit", "Inside bit", "Automatic charging waiting", "Manual charging waiting", "Inside bit", "Inside bit" ), "Alarm event 3": ( "EEP storage fault", "RTC error", "Voltage calibration not performed", "Current calibration not performed", "Zero calibration not performed", "Inside bit", "Inside bit", "Inside bit" ), } def parse_start_code(frame): soi = frame[0:1] if soi == "~": return "ok!" else: raise ValueError(f"Invalid start identifier! ({soi})") def parse_version_code(frame): ver = frame[1:3] return f"Protocol Version V{ver[0]}.{ver[1]}" def parse_address_code(frame): adr = frame[3:5] if 0 <= int(adr) <= 15: return adr else: raise ValueError(f"Invalid address: {adr} (out of range 0-15)") def parse_device_code(frame): cid1 = frame[5:7] return CID1_DEVICE_CODES.get(cid1, "Unknown!") def parse_function_code(frame): cid2 = frame[7:9] if cid2 in CID2_COMMAND_CODES: return f"Command -> {CID2_COMMAND_CODES.get(cid2)}" elif cid2 in CID2_RETURN_CODES: return f"Return -> {CID2_RETURN_CODES.get(cid2)}" else: return f"Unknown CID2: {cid2}" def parse_lchksum(length_code): # implements chapter 3.2.2 of the Protocol Specification lchksum = int(length_code[0], 16) # Compute lchksum d11d10d09d08 = int(length_code[1]) d07d06d05d04 = int(length_code[2]) d03d0ld01d00 = int(length_code[3]) sum = d11d10d09d08 + d07d06d05d04 + d03d0ld01d00 remainder = sum % 16 inverted = ~remainder & 0xF computed_lchksum = (inverted + 1) & 0xF if computed_lchksum == lchksum: return "ok!" else: raise ValueError(f"Invalid LCHKSUM: {lchksum} (computed: {computed_lchksum})") def parse_lenid(length_code): # implements chapter 3.2.1 of the Protocol Specification d11d10d09d08 = int(length_code[1]) d07d06d05d04 = int(length_code[2]) d03d0ld01d00 = int(length_code[3]) lenid = d11d10d09d08 << 8 | d07d06d05d04 << 4 | d03d0ld01d00 return lenid >> 1 def parse_length_code(frame): # implements chapter 3.2 of the Protocol Specification length_code = frame[9:13] lchksum = parse_lchksum(length_code) lenid = parse_lenid(length_code) return { "LCHKSUM": lchksum, "LENID": lenid } def parse_info(frame): cid2 = frame[7:9] lenid = parse_lenid(frame[9:13]) info = frame[13:13+lenid*2] if cid2 == '00' and lenid == 49: return parse_telecommand_return(info) elif cid2 == '00' and lenid == 75: return parse_telemetry_return(info) else: return info def parse_telecommand_return(info_raw, info={}, index=0): info["DATA FLAG"] = info_raw[index:index+2] index += 2 info["COMMAND GROUP"] = info_raw[index:index+2] index += 2 num_of_cells = int(info_raw[index:index+2], 16) info["Number of cells"] = num_of_cells index += 2 for cell in range(info["Number of cells"]): alarm = BYTE_ALARM_CODES.get(info_raw[index:index+2]) info[f"Cell {cell +1} alarm"] = alarm index += 2 num_of_temperatures = int(info_raw[index:index+2], 16) info["Number of temperatures"] = num_of_temperatures index += 2 for sensor in range(4): alarm = BYTE_ALARM_CODES.get(info_raw[index:index+2]) info[f"Cell temperature alarm {sensor}"] = alarm index += 2 alarm = BYTE_ALARM_CODES.get(info_raw[index:index+2]) info["Environment temperature alarm"] = alarm index += 2 alarm = BYTE_ALARM_CODES.get(info_raw[index:index+2]) info["Power temperature alarm 1"] = alarm index += 2 alarm = BYTE_ALARM_CODES.get(info_raw[index:index+2]) info["Charge/discharge current alarm"] = alarm index += 2 alarm = BYTE_ALARM_CODES.get(info_raw[index:index+2]) info["Total battery voltage alarm"] = alarm index += 2 num_custom = int(info_raw[index:index+2], 16) info["Number of custom alarms"] = num_custom index += 2 alarm = info_raw[index:index+2] info["Alarm event 1"] = alarm index += 2 alarm = info_raw[index:index+2] info["Alarm event 2"] = alarm index += 2 alarm = info_raw[index:index+2] info["Alarm event 3"] = alarm index += 2 alarm = info_raw[index:index+2] info["Alarm event 4"] = alarm index += 2 alarm = info_raw[index:index+2] info["Alarm event 5"] = alarm index += 2 alarm = info_raw[index:index+2] info["Alarm event 6"] = alarm index += 2 alarm = info_raw[index:index+2] info["On-off state"] = alarm index += 2 alarm = info_raw[index:index+2] info["Equilibrium state 1"] = alarm index += 2 alarm = info_raw[index:index+2] info["Equilibrium state 2"] = alarm index += 2 alarm = info_raw[index:index+2] info["System state"] = alarm index += 2 alarm = info_raw[index:index+2] info["Disconnection state 1"] = alarm index += 2 alarm = info_raw[index:index+2] info["Disconnection state 2"] = alarm index += 2 alarm = info_raw[index:index+2] info["Alarm event 7"] = alarm index += 2 alarm = info_raw[index:index+2] info["Alarm event 8"] = alarm index += 2 alarm = info_raw[index:index+2] info["Reservation extention 1"] = alarm index += 2 alarm = info_raw[index:index+2] info["Reservation extention 2"] = alarm index += 2 alarm = info_raw[index:index+2] info["Reservation extention 3"] = alarm index += 2 alarm = info_raw[index:index+2] info["Reservation extention 4"] = alarm index += 2 alarm = info_raw[index:index+2] info["Reservation extention 5"] = alarm index += 2 alarm = info_raw[index:index+2] info["Reservation extention 6"] = alarm index += 2 save_dict_to_csv(TELECOMMAND_FILE_PATH, info) return f"Telecommand Return Data saved in ./{TELECOMMAND_FILE_PATH}" def save_dict_to_csv(file_path, data): with open(file_path, mode='a+', newline='') as csvfile: csvfile.seek(0) has_header = csvfile.read(1) != "" csvfile.seek(0, 2) writer = csv.DictWriter(csvfile, fieldnames=data.keys()) if not has_header: writer.writeheader() writer.writerow(data) def parse_checksum(frame): """implements section 3.3 of the Protocol Specification""" chksum = int(frame[-6:-1], 16) data = frame[1:-5] # Compute chksum ascii_sum = sum(ord(char) for char in data) remainder = ascii_sum % 65536 inverted = ~remainder & 0xFFFF computed_chksum = (inverted + 1) & 0xFFFF # Compare with CHKSUM in frame if computed_chksum == chksum: return "ok!" else: raise ValueError(f"Invalid CHKSUM: {chksum} (computed: {computed_chksum})") def parse_end_code(frame): eoi = frame[-1] if eoi == "\r": return "ok!" else: raise ValueError(f"Invalid end identifier! ({eoi})") def parse_modbus_ascii_frame(frame, parsed_data = {}): frame = bytes.fromhex(frame).decode('ascii') parsed_data["SOI"] = parse_start_code(frame) parsed_data["VER"] = parse_version_code(frame) parsed_data["ADR"] = parse_address_code(frame) parsed_data["CID1"] = parse_device_code(frame) parsed_data["CID2"] = parse_function_code(frame) parsed_data["LENGTH"] = parse_length_code(frame) parsed_data["INFO"] = parse_info(frame) parsed_data["CHKSUM"] = parse_checksum(frame) parsed_data["EOI"] = parse_end_code(frame) return parsed_data def send_command(): # Define the serial port and baud rate port = 'COM9' # Replace with your actual port baudrate = 19200 # Replace with the correct baud rate for your BMS # Create the serial connection try: with serial.Serial(port, baudrate, timeout=1) as ser: # Convert the hex string to bytes command = bytes.fromhex("7E3230303034363434453030323030464433350D") # Send the command ser.write(command) print("Command sent successfully.") # Wait for and read the response response = ser.read(200) # Adjust the number of bytes to read as needed if response: hex_response = response.hex() print("Response received:", hex_response) # Process the response to check details parsed_result = parse_modbus_ascii_frame(hex_response) for key, value in parsed_result.items(): print(f"{key}: {value}") else: print("No response received.") except serial.SerialException as e: print(f"Error opening serial port: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") if __name__ == "__main__": send_command()