Pārlūkot izejas kodu

added first implementation of python driver to access a shellmatta using the transport layer

stefan 3 gadi atpakaļ
vecāks
revīzija
d07c957454

+ 0 - 0
python_driver/__init__.py


+ 252 - 0
python_driver/shellmatta_transport.py

@@ -0,0 +1,252 @@
+""" Python module to handle the shellmatta transport layer """
+
+from crccheck.crc import Crc32
+import struct
+
+class ShellmattaTransport():
+    """ Class to handle the shellmatta transport layer """
+
+    class Packet():
+        """ A shellmatta packet structure """
+
+        HEADER_LENGTH = 8
+        MAX_PAYLOAD_LENGTH = 255
+        CRC_LENGTH = 4
+
+        SOH                         = 1
+        PROTOCOL_VERSION            = 1
+
+        TYPE_DATA                   = 0
+        TYPE_SEQ_CNT_REQUEST        = 1
+        TYPE_SEQ_CNT_RESPOND        = 129
+        TYPE_MAX_BUFFERSIZE_REQUEST = 2
+        TYPE_MAX_BUFFERSIZE_RESPOND = 130
+        TYPE_SEARCH_DEVICE_REQUEST  = 3
+        TYPE_SEARCH_DEVICE_RESPOND  = 131
+        TYPE_SET_ADDRESS_REQUEST    = 4
+        TYPE_SET_ADDRESS_RESPOND    = 132
+
+        def __init__(self,
+                     start_of_header,
+                     protocol_version,
+                     packet_type,
+                     payload_length,
+                     source,
+                     destination,
+                     sequence_h2s,
+                     sequence_s2h,
+                     payload=b'',
+                     crc=0):
+            """ creates a shellmatta transport packet based on the passed data """
+
+            self.start_of_header    = start_of_header
+            self.protocol_version   = protocol_version
+            self.packet_type        = packet_type
+            self.payload_length     = payload_length
+            self.source             = source
+            self.destination        = destination
+            self.sequence_h2s       = sequence_h2s
+            self.sequence_s2h       = sequence_s2h
+            self.payload            = payload
+            self.crc                = crc
+
+        @classmethod
+        def from_header_data(cls, header_data):
+            """ create an empty packet based on raw header data """
+
+            if not header_data or len(header_data) != cls.HEADER_LENGTH:
+                raise ValueError("A shellmatta transport packet needs 8 data bytes as a header.")
+
+            data = struct.unpack('BBBBBBBB', header_data)
+            return cls(*data)
+
+        def set_payload(self, payload):
+            """ sets/replaces the complete payload of the packet """
+
+            if len(payload) > self.MAX_PAYLOAD_LENGTH:
+                raise ValueError("Payload size exceeds limit!")
+            self.payload = payload
+            self.payload_length = len(self.payload)
+
+        def append_payload(self, payload):
+            """ append passed payload to the packet """
+
+            if len(payload) + self.payload_length > self.MAX_PAYLOAD_LENGTH:
+                raise ValueError("Payload size exceeds limit!")
+            self.payload += payload
+            self.payload_length = len(self.payload)
+
+        def secure(self):
+            """ Calculates the crc checksum """
+
+            self.crc = Crc32.calc(bytes(self)[:-4])
+
+        def verify(self, crc):
+            """ Checks the packet agains the passed crc """
+
+            self.secure()
+            return crc == self.crc
+
+        def __bytes__(self):
+            """ Create binary representation of the packet """
+
+            # pack header
+            raw_buffer = struct.pack('BBBBBBBB',
+                                     self.start_of_header,
+                                     self.protocol_version,
+                                     self.packet_type,
+                                     self.payload_length,
+                                     self.source,
+                                     self.destination,
+                                     self.sequence_h2s,
+                                     self.sequence_s2h)
+            raw_buffer += self.payload
+            raw_buffer += self.crc.to_bytes(4, 'big')
+            return raw_buffer
+
+    def __init__(self, com_obj, mandatory=False, custom_crc=None):
+        self.com_obj = com_obj
+        self.mandatory = mandatory
+        self.custom_crc = custom_crc
+        self.sequence_counter_h2s = 0
+        self.sequence_counter_s2h = 0
+        self.received_raw_buffer = b''
+        self.received_packet = None
+        self.received_buffer = b''
+
+    def __send(self, packet_type, data, destination=0):
+        """ Sends data to the shellmatta - splitting at max size
+
+        Args:
+            packet_type (int): type of packet to send
+            data (bytestring): string of data to send
+        """
+        while len(data) > 0:
+            packet = self.Packet(self.Packet.SOH,
+                                self.Packet.PROTOCOL_VERSION,
+                                packet_type,
+                                len(data),
+                                0,
+                                destination,
+                                self.sequence_counter_h2s,
+                                self.sequence_counter_s2h,
+                                data[:self.Packet.MAX_PAYLOAD_LENGTH])
+            self.sequence_counter_h2s += 1
+            packet.secure()
+            self.com_obj.write(bytes(packet))
+            data = data[self.Packet.MAX_PAYLOAD_LENGTH:]
+
+    def __peek_com(self, size):
+        """ wraps the read method to be able to peek data - leave data in buffer
+
+        Args:
+            size(integer) : number of bytes to peek"""
+        if len(self.received_raw_buffer) < size:
+            received_data = self.com_obj.read(size - len(self.received_raw_buffer))
+            self.received_raw_buffer += received_data
+
+            if len(self.received_raw_buffer) < size:
+                raise TimeoutError("No response from Shellmatta")
+
+        # return the requested data from the buffer
+        data = self.received_raw_buffer[:size]
+
+        return data
+
+    def __read_com(self, size):
+        """ wraps the read method - removes read data from the bufffer
+
+        Args:
+            size(integer) : number of bytes to read"""
+
+        # return the requested data from the buffer
+        data = self.__peek_com(size)
+        self.received_raw_buffer = self.received_raw_buffer[size:]
+
+        return data
+
+    def __process_reception(self):
+        """ try to read a complete telegram from the shellmatta """
+
+        data = self.__peek_com(1)
+        success = False
+
+        # start parsing transport layer telegram
+        if int(data[0]) == self.Packet.SOH:
+
+            # process the header
+            data = self.__peek_com(self.Packet.HEADER_LENGTH)
+            packet = self.Packet.from_header_data(data)
+
+            # read complete packet
+            packet_size = self.Packet.HEADER_LENGTH + packet.payload_length + self.Packet.CRC_LENGTH
+            packet_data = self.__peek_com(packet_size)
+            packet.set_payload(packet_data[self.Packet.HEADER_LENGTH:-self.Packet.CRC_LENGTH])
+
+            # verify crc
+            crc = int.from_bytes(packet_data[-self.Packet.CRC_LENGTH:], 'big', signed=False)
+            success = packet.verify(crc)
+
+            if success:
+
+                # remove the packet from the raw buffer
+                self.__read_com(packet_size)
+
+                if packet.packet_type == packet.TYPE_DATA:
+                    self.received_buffer += packet.payload
+
+        # process invalid bytes
+        if not success:
+            if not self.mandatory:
+                # append the received SOH directly to the buffer
+                self.received_buffer += self.__read_com(1)
+            else:
+                # throw away the SOH byte
+                self.__read_com(1)
+
+    def write(self, data):
+        """ Send data using the transport layer
+
+        Args:
+            data (bytes): data to send to shellmatta
+        """
+        if not isinstance(data, bytes):
+            raise ValueError("data must be od type bytes")
+        self.__send(self.Packet.TYPE_DATA, data)
+
+    def write_manual(self, data):
+        """Send data as if it was written by manual input. Will not use transport layer protocol.
+
+        Args:
+            data (string): String to send to shellmatta
+        """
+        if not isinstance(data, bytes):
+            raise ValueError("data must be od type bytes")
+        self.com_obj.write(data)
+
+    def read(self, size=1):
+        """ Reads size bytes from the shellmatta transport layer
+
+        Args:
+            size (integer): number of bytes to read
+        """
+        try:
+            while len(self.received_buffer) < size:
+                self.__process_reception()
+        except TimeoutError:
+            pass
+
+        data = self.received_buffer[:size]
+        self.received_buffer = self.received_buffer[size:]
+
+        return data
+
+    def reset(self):
+        """ resets all internal states and flush the counterpart """
+        self.sequence_counter_h2s = 0
+        self.sequence_counter_s2h = 0
+        self.received_raw_buffer = b''
+        self.received_packet = None
+        self.received_buffer = b''
+        # send lots of cancels
+        self.com_obj.write(b'\x03' * (self.Packet.MAX_PAYLOAD_LENGTH + 12))

+ 80 - 0
python_driver/shellmatta_transport_serial.py

@@ -0,0 +1,80 @@
+""" Wrapper arount a shellmatta with transport layer to send commands and receive the response """
+
+import sys
+import serial
+import argparse
+from shellmatta_transport import ShellmattaTransport
+
+class ShellmattaSerial():
+    """ Helper class to communicate with a shelmatta enabled device """
+
+
+    def __init__(self, com_port, baudrate=115200, prompt="->", timeout=1, transport_layer_mandatory=False):
+        """ create the transport layer instance using the passed com port """
+        self.prompt = prompt
+        self.com_port = serial.Serial(com_port, baudrate=baudrate, timeout=timeout)
+
+        self.transport = ShellmattaTransport(self.com_port,
+                                             transport_layer_mandatory,
+                                             None)
+
+    def send_command(self, command):
+        """ Send command and wait for the prompt in the response. """
+
+        send_string = command + "\r"
+        self.transport.write(str.encode(send_string))
+        data = b''
+
+        while True:
+            data += self.transport.read()
+            data_string = data.decode()
+
+            if data_string.endswith(self.prompt):
+                # return received string without echo and prompt
+                if data_string.startswith(send_string):
+                    data_string = data_string[len(send_string):-len(self.prompt)]
+                return data_string
+
+    def send_command_only(self, command):
+        """ Send command without waiting for response. """
+
+        send_string = command + "\r"
+        self.transport.write(str.encode(send_string))
+
+    def send_raw(self, data):
+        """ Send a passed bytes string without any mercy. """
+
+        self.transport.write_manual(data)
+
+    def reset_communication(self):
+        """ Clears all internal buffers, throwing away all data. """
+        self.transport.reset()
+        self.com_port.reset_input_buffer()
+        self.com_port.reset_output_buffer()
+
+# start interactive mode if not used as a module
+if __name__ == "__main__":
+
+    # setup an argument parser
+    parser = argparse.ArgumentParser()
+    parser.add_argument("-c", "--com-port", dest="com_port", help="Com port to use.", required=True)
+    parser.add_argument("-b", "--baudrate", dest="baudrate", help="Baudrate to use.", type=int, default=115200)
+    parser.add_argument("-p", "--prompt",   dest="prompt",   help="Prompt text of the shellmatta instance.", default="->")
+    parser.add_argument("-t", "--timeout",  dest="timeout",  help="Timeout on the com port.", type=float, default=0.1)
+    parser.add_argument("-m", "--mandatory", dest="mandatory", help="Force transport layer usage.", default=True)
+    args = parser.parse_args()
+
+    # get our own shellmatta serial instance
+    shellmatta = ShellmattaSerial(args.com_port,
+                                  args.baudrate,
+                                  args.prompt,
+                                  args.timeout,
+                                  args.mandatory)
+
+    shellmatta.reset_communication()
+
+    # process user input
+    print("Shellmatta Transport Serial wrapper - type commands as you like:")
+    while True:
+        command = input()
+        print(shellmatta.send_command(command))