133 lines
4.2 KiB
Python
133 lines
4.2 KiB
Python
|
import serial
|
||
|
|
||
|
def parse_start_code(frame):
|
||
|
soi = frame[0:2]
|
||
|
if soi == "~":
|
||
|
return "ok!"
|
||
|
else:
|
||
|
raise ValueError(f"Invalid start identifier! ({soi})")
|
||
|
|
||
|
def parse_version_code(frame):
|
||
|
ver = frame[2:6]
|
||
|
return f"Protocol Version V{ver[0]}.{ver[1]}"
|
||
|
|
||
|
def parse_address_code(frame):
|
||
|
adr = frame[6:10]
|
||
|
if 0 <= int(adr) <= 15:
|
||
|
return adr
|
||
|
else:
|
||
|
raise ValueError(f"Invalid address: {adr} (out of range 0-15)")
|
||
|
|
||
|
def parse_device_code(frame):
|
||
|
cid1 = frame[10:14]
|
||
|
return bms.CID1_DEVICE_CODES.get(cid1, "Unknown!")
|
||
|
|
||
|
def parse_function_code(frame):
|
||
|
cid2 = frame[14:18]
|
||
|
if cid2 in bms.CID2_COMMAND_CODES:
|
||
|
return f"Command -> {bms.CID2_COMMAND_CODES.get(cid2)}"
|
||
|
elif cid2 in bms.CID2_RETURN_CODES:
|
||
|
return f"Return -> {bms.CID2_RETURN_CODES.get(cid2)}"
|
||
|
else:
|
||
|
return f"Unknown CID2: {cid2}"
|
||
|
|
||
|
def parse_lchksum(length_code):
|
||
|
# implements chapter 3.2.2 of the Protocol Specification
|
||
|
lchksum = int(length_code[0], 16)
|
||
|
# Compute lchksum
|
||
|
d11d10d09d08 = int(length_code[1])
|
||
|
d07d06d05d04 = int(length_code[2])
|
||
|
d03d0ld01d00 = int(length_code[3])
|
||
|
sum = d11d10d09d08 + d07d06d05d04 + d03d0ld01d00
|
||
|
remainder = sum % 16
|
||
|
inverted = ~remainder & 0xF
|
||
|
computed_lchksum = (inverted + 1) & 0xF
|
||
|
if computed_lchksum == lchksum:
|
||
|
return "ok!"
|
||
|
else:
|
||
|
raise ValueError(f"Invalid LCHKSUM: {lchksum} (computed: {computed_lchksum})")
|
||
|
|
||
|
|
||
|
def parse_lenid(length_code):
|
||
|
# implements chapter 3.2.1 of the Protocol Specification
|
||
|
d11d10d09d08 = int(length_code[1])
|
||
|
d07d06d05d04 = int(length_code[2])
|
||
|
d03d0ld01d00 = int(length_code[3])
|
||
|
lenid = d11d10d09d08 << 8 | d07d06d05d04 << 4 | d03d0ld01d00
|
||
|
return lenid>>1
|
||
|
|
||
|
def parse_length_code(frame):
|
||
|
# implements chapter 3.2 of the Protocol Specification
|
||
|
length_code = frame[18:26]
|
||
|
lchksum = parse_lchksum(length_code)
|
||
|
lenid = parse_lenid(length_code)
|
||
|
return { "LCHKSUM": lchksum, "LENID": lenid }
|
||
|
|
||
|
def parse_info(frame):
|
||
|
cid2 = frame[14:18]
|
||
|
lenid = parse_lenid(frame[18:26])
|
||
|
info = frame[26:26+lenid*2]
|
||
|
|
||
|
if cid2 == '00' and lenid == 49:
|
||
|
return parse_telecommand_return(info)
|
||
|
elif cid2 == '00' and lenid == 75:
|
||
|
return parse_telemetry_return(info)
|
||
|
else:
|
||
|
return info
|
||
|
|
||
|
def parse_telecommand_return(info_raw, info={}, index=0):
|
||
|
|
||
|
info["DATA FLAG"] = info_raw[index:index+4]
|
||
|
index += 4
|
||
|
|
||
|
info["COMMAND GROUP"] = info_raw[index:index+4]
|
||
|
index += 4
|
||
|
|
||
|
|
||
|
def parse_modbus_ascii_frame(frame, parsed_data = {}):
|
||
|
frame = bytes.fromhex(frame).decode('ascii')
|
||
|
parsed_data["SOI"] = parse_start_code(frame)
|
||
|
parsed_data["VER"] = parse_version_code(frame)
|
||
|
parsed_data["ADR"] = parse_address_code(frame)
|
||
|
parsed_data["CID1"] = parse_device_code(frame)
|
||
|
parsed_data["CID2"] = parse_function_code(frame)
|
||
|
parsed_data["LENGTH"] = parse_length_code(frame)
|
||
|
parsed_data["INFO"] = parse_info(frame)
|
||
|
parsed_data["CHKSUM"] = parse_checksum(frame)
|
||
|
parsed_data["EOI"] = parse_end_code(frame)
|
||
|
return parsed_data
|
||
|
|
||
|
def send_command():
|
||
|
|
||
|
# Define the serial port and baud rate
|
||
|
port = 'COM9' # Replace with your actual port
|
||
|
baudrate = 19200 # Replace with the correct baud rate for your BMS
|
||
|
|
||
|
# Create the serial connection
|
||
|
try:
|
||
|
with serial.Serial(port, baudrate, timeout=1) as ser:
|
||
|
# Convert the hex string to bytes
|
||
|
command = bytes.fromhex("7E3230303034363434453030323030464433350D")
|
||
|
|
||
|
# Send the command
|
||
|
ser.write(command)
|
||
|
print("Command sent successfully.")
|
||
|
|
||
|
# Wait for and read the response
|
||
|
response = ser.read(200) # Adjust the number of bytes to read as needed
|
||
|
if response:
|
||
|
hex_response = response.hex()
|
||
|
print("Response received:", hex_response)
|
||
|
# Process the response to check details
|
||
|
check_starting_byte_and_extract_details(hex_response)
|
||
|
else:
|
||
|
print("No response received.")
|
||
|
except serial.SerialException as e:
|
||
|
print(f"Error opening serial port: {e}")
|
||
|
except Exception as e:
|
||
|
print(f"An unexpected error occurred: {e}")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
send_command()
|