# # Serial communication # # Copyright (c) 2013 Michael Buesch # Licensed under the terms of the GNU General Public License version 2. # import sys import struct import serial import time class SerialError(Exception): pass class SerialMessage(object): SER_PAYLOAD_LEN = 8 SER_HDR_LEN = 4 SER_FCS_LEN = 2 MSG_SIZE = SER_HDR_LEN + SER_PAYLOAD_LEN + SER_FCS_LEN @classmethod def crc16Update(cls, crc, data): crc ^= data for i in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc = crc >> 1 return crc @classmethod def crc16(cls, data): crc = 0xFFFF for d in data: crc = cls.crc16Update(crc, d) return crc ^ 0xFFFF def __init__(self, fc=0, payload=b''): self.fc = fc self.seq = 0 self.sa = 0 self.da = 0 pl = list(payload) pl.extend( [ 0 ] * (SerialMessage.SER_PAYLOAD_LEN - len(pl)) ) self.payload = bytes(pl) self.fcs = 0 def __calcFcs(self): return self.crc16(self.__getBytes()[0:-2]) def __getBytes(self): return struct.pack("14B", self.fc, self.seq, (self.sa & 0xF) | ((self.da & 0xF) << 4), 0, self.payload[0], self.payload[1], self.payload[2], self.payload[3], self.payload[4], self.payload[5], self.payload[6], self.payload[7], self.fcs & 0xFF, (self.fcs >> 8) & 0xFF) def getBytes(self): self.fcs = self.__calcFcs() return self.__getBytes() def __setBytes(self, data): if len(data) != self.MSG_SIZE: raise SerialError("Msg: Invalid number of bytes") fields = struct.unpack("14B", data) self.fc = fields[0] self.seq = fields[1] self.sa = fields[2] & 0xF self.da = (fields[2] >> 4) & 0xF self.payload = bytes(fields[4:12]) self.fcs = fields[12] | (fields[13] << 8) def setBytes(self, data): self.__setBytes(data) if self.__calcFcs() != self.fcs: raise SerialError("Msg: FCS mismatch") def __repr__(self): ret = "SerialMessage: " for b in self.getBytes(): ret += "%02X" % b return ret class SerialComm(object): def __init__(self, device, baudrate=9600, nrbits=8, parity=serial.PARITY_NONE, nrstop=1, localAddress=1): self.sendDelay = 0 self.serial = serial.Serial(device, baudrate, nrbits, parity, nrstop) self.localAddress = localAddress self.seq = 0 def close(self): self.serial.close() def setSendDelay(self, seconds): self.sendDelay = seconds def receive(self): while 1: b = self.serial.read(SerialMessage.MSG_SIZE) msg = SerialMessage() msg.setBytes(b) if msg.da == self.localAddress: return msg def send(self, msg, destinationAddress=0): msg.da = destinationAddress & 0xF msg.seq = self.seq self.seq += 1 data = msg.getBytes() if self.sendDelay: for b in data: self.serial.write(bytes((b,))) self.serial.flush() time.sleep(self.sendDelay) else: self.serial.write(data) self.serial.flush()