| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 | """ Wrapper arount a shellmatta with transport layer to send commands and receive the response """import sysimport serialimport argparsefrom shellmatta_transport import ShellmattaTransportclass ShellmattaSerial():    """ Helper class to communicate with a shelmatta enabled device """    def __init__(self, com_port, baudrate=115200, prompt="->", timeout=1, transport_layer_mandatory=False):        """ create the transport layer instance using the passed com port """        self.prompt = prompt        self.com_port = serial.Serial(com_port, baudrate=baudrate, timeout=timeout)        self.transport = ShellmattaTransport(self.com_port,                                             transport_layer_mandatory,                                             None)    def send_command(self, command):        """ Send command and wait for the prompt in the response. """        send_string = command + "\r"        self.transport.write(str.encode(send_string))        data = b''        while True:            data += self.transport.read()            data_string = data.decode()            if data_string.endswith(self.prompt):                # return received string without echo and prompt                if data_string.startswith(send_string):                    data_string = data_string[len(send_string):-len(self.prompt)]                return data_string    def send_command_only(self, command):        """ Send command without waiting for response. """        send_string = command + "\r"        self.transport.write(str.encode(send_string))    def send_raw(self, data):        """ Send a passed bytes string without any mercy. """        self.transport.write_manual(data)    def reset_communication(self):        """ Clears all internal buffers, throwing away all data. """        self.transport.reset()        self.com_port.reset_input_buffer()        self.com_port.reset_output_buffer()# start interactive mode if not used as a moduleif __name__ == "__main__":    # setup an argument parser    parser = argparse.ArgumentParser()    parser.add_argument("-c", "--com-port", dest="com_port", help="Com port to use.", required=True)    parser.add_argument("-b", "--baudrate", dest="baudrate", help="Baudrate to use.", type=int, default=115200)    parser.add_argument("-p", "--prompt",   dest="prompt",   help="Prompt text of the shellmatta instance.", default="->")    parser.add_argument("-t", "--timeout",  dest="timeout",  help="Timeout on the com port.", type=float, default=0.1)    parser.add_argument("-m", "--mandatory", dest="mandatory", help="Force transport layer usage.", default=True)    args = parser.parse_args()    # get our own shellmatta serial instance    shellmatta = ShellmattaSerial(args.com_port,                                  args.baudrate,                                  args.prompt,                                  args.timeout,                                  args.mandatory)    shellmatta.reset_communication()    # process user input    print("Shellmatta Transport Serial wrapper - type commands as you like:")    while True:        command = input()        print(shellmatta.send_command(command))
 |