88 lines
2.9 KiB
Python
88 lines
2.9 KiB
Python
# import vedirect from https://github.com/karioja/vedirect
|
|
# description of channels:https://beta.ivc.no/wiki/index.php/Victron_VE_Direct_DIY_Cable
|
|
import serial
|
|
class vedirect:
|
|
def __init__(self, serialport, timeout):
|
|
self.serialport = serialport
|
|
self.ser = serial.Serial(serialport, 19200, timeout=timeout)
|
|
self.header1 = '\r'
|
|
self.header2 = '\n'
|
|
self.hexmarker = ':'
|
|
self.delimiter = '\t'
|
|
self.key = ''
|
|
self.value = ''
|
|
self.bytes_sum = 0;
|
|
self.state = self.WAIT_HEADER
|
|
self.dict = {}
|
|
(HEX, WAIT_HEADER, IN_KEY, IN_VALUE, IN_CHECKSUM) = range(5)
|
|
def input(self, byte):
|
|
if byte == self.hexmarker and self.state != self.IN_CHECKSUM:
|
|
self.state = self.HEX
|
|
if self.state == self.WAIT_HEADER:
|
|
self.bytes_sum += ord(byte)
|
|
if byte == self.header1:
|
|
self.state = self.WAIT_HEADER
|
|
elif byte == self.header2:
|
|
self.state = self.IN_KEY
|
|
return None
|
|
elif self.state == self.IN_KEY:
|
|
self.bytes_sum += ord(byte)
|
|
if byte == self.delimiter:
|
|
if (self.key == 'Checksum'):
|
|
self.state = self.IN_CHECKSUM
|
|
else:
|
|
self.state = self.IN_VALUE
|
|
else:
|
|
self.key += byte
|
|
return None
|
|
elif self.state == self.IN_VALUE:
|
|
self.bytes_sum += ord(byte)
|
|
if byte == self.header1:
|
|
self.state = self.WAIT_HEADER
|
|
self.dict[self.key] = self.value;
|
|
self.key = '';
|
|
self.value = '';
|
|
else:
|
|
self.value += byte
|
|
return None
|
|
elif self.state == self.IN_CHECKSUM:
|
|
self.bytes_sum += ord(byte)
|
|
self.key = ''
|
|
self.value = ''
|
|
self.state = self.WAIT_HEADER
|
|
if (self.bytes_sum % 256 == 0):
|
|
self.bytes_sum = 0
|
|
return self.dict
|
|
else:
|
|
print 'Malformed packet'
|
|
self.bytes_sum = 0
|
|
elif self.state == self.HEX:
|
|
self.bytes_sum = 0
|
|
if byte == self.header2:
|
|
self.state = self.WAIT_HEADER
|
|
else:
|
|
raise AssertionError()
|
|
def read_data(self):
|
|
while True:
|
|
byte = self.ser.read(1)
|
|
packet = self.input(byte)
|
|
def read_data_single(self):
|
|
while True:
|
|
byte = self.ser.read(1)
|
|
packet = self.input(byte)
|
|
if (packet != None):
|
|
return packet
|
|
def read_data_callback(self, callbackFunction):
|
|
while True:
|
|
byte = self.ser.read(1)
|
|
if byte:
|
|
packet = self.input(byte)
|
|
if (packet != None):
|
|
callbackFunction(packet)
|
|
else:
|
|
break
|
|
|
|
def print_data_callback(data):
|
|
print data
|
|
# end import vedirect
|