123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- """ Python module to handle the shellmatta transport layer """
- from crccheck.crc import Crc32
- import struct
- class ShellmattaTransport():
- """ Class to handle the shellmatta transport layer """
- class Packet():
- """ A shellmatta packet structure """
- HEADER_LENGTH = 8
- MAX_PAYLOAD_LENGTH = 255
- CRC_LENGTH = 4
- SOH = 1
- PROTOCOL_VERSION = 1
- TYPE_DATA = 0
- TYPE_SEQ_CNT_REQUEST = 1
- TYPE_SEQ_CNT_RESPOND = 129
- TYPE_MAX_BUFFERSIZE_REQUEST = 2
- TYPE_MAX_BUFFERSIZE_RESPOND = 130
- TYPE_SEARCH_DEVICE_REQUEST = 3
- TYPE_SEARCH_DEVICE_RESPOND = 131
- TYPE_SET_ADDRESS_REQUEST = 4
- TYPE_SET_ADDRESS_RESPOND = 132
- def __init__(self,
- start_of_header,
- protocol_version,
- packet_type,
- payload_length,
- source,
- destination,
- sequence_h2s,
- sequence_s2h,
- payload=b'',
- crc=0,
- crc_fct=Crc32.calc):
- """ creates a shellmatta transport packet based on the passed data """
- self.start_of_header = start_of_header
- self.protocol_version = protocol_version
- self.packet_type = packet_type
- self.payload_length = payload_length
- self.source = source
- self.destination = destination
- self.sequence_h2s = sequence_h2s
- self.sequence_s2h = sequence_s2h
- self.payload = payload
- self.crc = crc
- self.crc_fct = crc_fct
- @classmethod
- def from_header_data(cls, header_data):
- """ create an empty packet based on raw header data """
- if not header_data or len(header_data) != cls.HEADER_LENGTH:
- raise ValueError("A shellmatta transport packet needs 8 data bytes as a header.")
- data = struct.unpack('BBBBBBBB', header_data)
- return cls(*data)
- def set_payload(self, payload):
- """ sets/replaces the complete payload of the packet """
- if len(payload) > self.MAX_PAYLOAD_LENGTH:
- raise ValueError("Payload size exceeds limit!")
- self.payload = payload
- self.payload_length = len(self.payload)
- def append_payload(self, payload):
- """ append passed payload to the packet """
- if len(payload) + self.payload_length > self.MAX_PAYLOAD_LENGTH:
- raise ValueError("Payload size exceeds limit!")
- self.payload += payload
- self.payload_length = len(self.payload)
- def calc_crc(self):
- """ Calculates the crc checksum """
- return self.crc_fct(bytes(self)[:-4])
- def secure(self):
- """ Calculates the crc checksum """
- self.crc = self.calc_crc()
- def verify(self, crc):
- """ Checks the packet agains the passed crc """
- return crc == self.calc_crc()
- def __bytes__(self):
- """ Create binary representation of the packet """
- # pack header
- raw_buffer = struct.pack('BBBBBBBB',
- self.start_of_header,
- self.protocol_version,
- self.packet_type,
- self.payload_length,
- self.source,
- self.destination,
- self.sequence_h2s,
- self.sequence_s2h)
- raw_buffer += self.payload
- raw_buffer += self.crc.to_bytes(4, 'big')
- return raw_buffer
- def __init__(self, com_obj, mandatory=False, custom_crc=Crc32.calc):
- self.com_obj = com_obj
- self.mandatory = mandatory
- self.custom_crc = custom_crc
- self.sequence_counter_h2s = 0
- self.sequence_counter_s2h = 0
- self.received_raw_buffer = b''
- self.received_packet = None
- self.received_buffer = b''
- def __send(self, packet_type, data, destination=0):
- """ Sends data to the shellmatta - splitting at max size
- Args:
- packet_type (int): type of packet to send
- data (bytestring): string of data to send
- """
- while len(data) > 0:
- packet = self.Packet(self.Packet.SOH,
- self.Packet.PROTOCOL_VERSION,
- packet_type,
- min(len(data), self.Packet.MAX_PAYLOAD_LENGTH),
- 0,
- destination,
- self.sequence_counter_h2s,
- self.sequence_counter_s2h,
- data[:self.Packet.MAX_PAYLOAD_LENGTH],
- crc_fct=self.custom_crc)
- self.sequence_counter_h2s += 1
- packet.secure()
- self.com_obj.write(bytes(packet))
- data = data[self.Packet.MAX_PAYLOAD_LENGTH:]
- def __peek_com(self, size):
- """ wraps the read method to be able to peek data - leave data in buffer
- Args:
- size(integer) : number of bytes to peek"""
- if len(self.received_raw_buffer) < size:
- received_data = self.com_obj.read(size - len(self.received_raw_buffer))
- self.received_raw_buffer += received_data
- if len(self.received_raw_buffer) < size:
- raise TimeoutError("No response from Shellmatta")
- # return the requested data from the buffer
- data = self.received_raw_buffer[:size]
- return data
- def __read_com(self, size):
- """ wraps the read method - removes read data from the bufffer
- Args:
- size(integer) : number of bytes to read"""
- # return the requested data from the buffer
- data = self.__peek_com(size)
- self.received_raw_buffer = self.received_raw_buffer[size:]
- return data
- def __process_reception(self):
- """ try to read a complete telegram from the shellmatta """
- data = self.__peek_com(1)
- success = False
- # start parsing transport layer telegram
- if int(data[0]) == self.Packet.SOH:
- # process the header
- data = self.__peek_com(self.Packet.HEADER_LENGTH)
- packet = self.Packet.from_header_data(data)
- # read complete packet
- packet_size = self.Packet.HEADER_LENGTH + packet.payload_length + self.Packet.CRC_LENGTH
- packet_data = self.__peek_com(packet_size)
- packet.set_payload(packet_data[self.Packet.HEADER_LENGTH:-self.Packet.CRC_LENGTH])
- # verify crc
- crc = int.from_bytes(packet_data[-self.Packet.CRC_LENGTH:], 'big', signed=False)
- success = packet.verify(crc)
- if success:
- # remove the packet from the raw buffer
- self.__read_com(packet_size)
- if packet.packet_type == packet.TYPE_DATA:
- self.received_buffer += packet.payload
- # process invalid bytes
- if not success:
- if not self.mandatory:
- # append the received SOH directly to the buffer
- self.received_buffer += self.__read_com(1)
- else:
- # throw away the SOH byte
- self.__read_com(1)
- def write(self, data):
- """ Send data using the transport layer
- Args:
- data (bytes): data to send to shellmatta
- """
- if not isinstance(data, bytes):
- raise ValueError("data must be od type bytes")
- self.__send(self.Packet.TYPE_DATA, data)
- def write_manual(self, data):
- """Send data as if it was written by manual input. Will not use transport layer protocol.
- Args:
- data (string): String to send to shellmatta
- """
- if not isinstance(data, bytes):
- raise ValueError("data must be od type bytes")
- self.com_obj.write(data)
- def read(self, size=1):
- """ Reads size bytes from the shellmatta transport layer
- Args:
- size (integer): number of bytes to read
- """
- try:
- while len(self.received_buffer) < size:
- self.__process_reception()
- except TimeoutError:
- pass
- data = self.received_buffer[:size]
- self.received_buffer = self.received_buffer[size:]
- return data
- def reset(self):
- """ resets all internal states and flush the counterpart """
- self.sequence_counter_h2s = 0
- self.sequence_counter_s2h = 0
- self.received_raw_buffer = b''
- self.received_packet = None
- self.received_buffer = b''
- # flush the buffer and send one cancel
- self.com_obj.write(b'\x00' * (self.Packet.MAX_PAYLOAD_LENGTH + 12))
- self.write(b'\x03')
- def close(self):
- """ Close port """
- self.reset()
|