""" Python module to handle the shellmatta transport layer """ from crccheck.crc import Crc32 import struct class ShellmattaTransport(): """ Class to handle the shellmatta transport layer """ class Packet(): """ A shellmatta packet structure """ HEADER_LENGTH = 8 MAX_PAYLOAD_LENGTH = 255 CRC_LENGTH = 4 SOH = 1 PROTOCOL_VERSION = 1 TYPE_DATA = 0 TYPE_SEQ_CNT_REQUEST = 1 TYPE_SEQ_CNT_RESPOND = 129 TYPE_MAX_BUFFERSIZE_REQUEST = 2 TYPE_MAX_BUFFERSIZE_RESPOND = 130 TYPE_SEARCH_DEVICE_REQUEST = 3 TYPE_SEARCH_DEVICE_RESPOND = 131 TYPE_SET_ADDRESS_REQUEST = 4 TYPE_SET_ADDRESS_RESPOND = 132 def __init__(self, start_of_header, protocol_version, packet_type, payload_length, source, destination, sequence_h2s, sequence_s2h, payload=b'', crc=0): """ creates a shellmatta transport packet based on the passed data """ self.start_of_header = start_of_header self.protocol_version = protocol_version self.packet_type = packet_type self.payload_length = payload_length self.source = source self.destination = destination self.sequence_h2s = sequence_h2s self.sequence_s2h = sequence_s2h self.payload = payload self.crc = crc @classmethod def from_header_data(cls, header_data): """ create an empty packet based on raw header data """ if not header_data or len(header_data) != cls.HEADER_LENGTH: raise ValueError("A shellmatta transport packet needs 8 data bytes as a header.") data = struct.unpack('BBBBBBBB', header_data) return cls(*data) def set_payload(self, payload): """ sets/replaces the complete payload of the packet """ if len(payload) > self.MAX_PAYLOAD_LENGTH: raise ValueError("Payload size exceeds limit!") self.payload = payload self.payload_length = len(self.payload) def append_payload(self, payload): """ append passed payload to the packet """ if len(payload) + self.payload_length > self.MAX_PAYLOAD_LENGTH: raise ValueError("Payload size exceeds limit!") self.payload += payload self.payload_length = len(self.payload) def secure(self): """ Calculates the crc checksum """ self.crc = Crc32.calc(bytes(self)[:-4]) def verify(self, crc): """ Checks the packet agains the passed crc """ self.secure() return crc == self.crc def __bytes__(self): """ Create binary representation of the packet """ # pack header raw_buffer = struct.pack('BBBBBBBB', self.start_of_header, self.protocol_version, self.packet_type, self.payload_length, self.source, self.destination, self.sequence_h2s, self.sequence_s2h) raw_buffer += self.payload raw_buffer += self.crc.to_bytes(4, 'big') return raw_buffer def __init__(self, com_obj, mandatory=False, custom_crc=None): self.com_obj = com_obj self.mandatory = mandatory self.custom_crc = custom_crc self.sequence_counter_h2s = 0 self.sequence_counter_s2h = 0 self.received_raw_buffer = b'' self.received_packet = None self.received_buffer = b'' def __send(self, packet_type, data, destination=0): """ Sends data to the shellmatta - splitting at max size Args: packet_type (int): type of packet to send data (bytestring): string of data to send """ while len(data) > 0: packet = self.Packet(self.Packet.SOH, self.Packet.PROTOCOL_VERSION, packet_type, min(len(data), self.Packet.MAX_PAYLOAD_LENGTH), 0, destination, self.sequence_counter_h2s, self.sequence_counter_s2h, data[:self.Packet.MAX_PAYLOAD_LENGTH]) self.sequence_counter_h2s += 1 packet.secure() self.com_obj.write(bytes(packet)) data = data[self.Packet.MAX_PAYLOAD_LENGTH:] def __peek_com(self, size): """ wraps the read method to be able to peek data - leave data in buffer Args: size(integer) : number of bytes to peek""" if len(self.received_raw_buffer) < size: received_data = self.com_obj.read(size - len(self.received_raw_buffer)) self.received_raw_buffer += received_data if len(self.received_raw_buffer) < size: raise TimeoutError("No response from Shellmatta") # return the requested data from the buffer data = self.received_raw_buffer[:size] return data def __read_com(self, size): """ wraps the read method - removes read data from the bufffer Args: size(integer) : number of bytes to read""" # return the requested data from the buffer data = self.__peek_com(size) self.received_raw_buffer = self.received_raw_buffer[size:] return data def __process_reception(self): """ try to read a complete telegram from the shellmatta """ data = self.__peek_com(1) success = False # start parsing transport layer telegram if int(data[0]) == self.Packet.SOH: # process the header data = self.__peek_com(self.Packet.HEADER_LENGTH) packet = self.Packet.from_header_data(data) # read complete packet packet_size = self.Packet.HEADER_LENGTH + packet.payload_length + self.Packet.CRC_LENGTH packet_data = self.__peek_com(packet_size) packet.set_payload(packet_data[self.Packet.HEADER_LENGTH:-self.Packet.CRC_LENGTH]) # verify crc crc = int.from_bytes(packet_data[-self.Packet.CRC_LENGTH:], 'big', signed=False) success = packet.verify(crc) if success: # remove the packet from the raw buffer self.__read_com(packet_size) if packet.packet_type == packet.TYPE_DATA: self.received_buffer += packet.payload # process invalid bytes if not success: if not self.mandatory: # append the received SOH directly to the buffer self.received_buffer += self.__read_com(1) else: # throw away the SOH byte self.__read_com(1) def write(self, data): """ Send data using the transport layer Args: data (bytes): data to send to shellmatta """ if not isinstance(data, bytes): raise ValueError("data must be od type bytes") self.__send(self.Packet.TYPE_DATA, data) def write_manual(self, data): """Send data as if it was written by manual input. Will not use transport layer protocol. Args: data (string): String to send to shellmatta """ if not isinstance(data, bytes): raise ValueError("data must be od type bytes") self.com_obj.write(data) def read(self, size=1): """ Reads size bytes from the shellmatta transport layer Args: size (integer): number of bytes to read """ try: while len(self.received_buffer) < size: self.__process_reception() except TimeoutError: pass data = self.received_buffer[:size] self.received_buffer = self.received_buffer[size:] return data def reset(self): """ resets all internal states and flush the counterpart """ self.sequence_counter_h2s = 0 self.sequence_counter_s2h = 0 self.received_raw_buffer = b'' self.received_packet = None self.received_buffer = b'' # flush the buffer and send one cancel # self.com_obj.write(b'\x00' * (self.Packet.MAX_PAYLOAD_LENGTH + 12)) # self.write(b'\x03') def close(self): """Close port""" self.reset()