""" Wrapper arount a shellmatta with transport layer to send commands and receive the response """ import sys import serial import argparse from shellmatta_transport import ShellmattaTransport class ShellmattaSerial(): """ Helper class to communicate with a shelmatta enabled device """ def __init__(self, com_port, baudrate=115200, prompt="->", timeout=1, transport_layer_mandatory=False): """ create the transport layer instance using the passed com port """ self.prompt = prompt self.com_port = serial.Serial(com_port, baudrate=baudrate, timeout=timeout) self.transport = ShellmattaTransport(self.com_port, transport_layer_mandatory, None) def send_command(self, command): """ Send command and wait for the prompt in the response. """ send_string = command + "\r" self.transport.write(str.encode(send_string)) data = b'' while True: data += self.transport.read() data_string = data.decode() if data_string.endswith(self.prompt): # return received string without echo and prompt if data_string.startswith(send_string): data_string = data_string[len(send_string):-len(self.prompt)] return data_string def send_command_only(self, command): """ Send command without waiting for response. """ send_string = command + "\r" self.transport.write(str.encode(send_string)) def send_raw(self, data): """ Send a passed bytes string without any mercy. """ self.transport.write_manual(data) def reset_communication(self): """ Clears all internal buffers, throwing away all data. """ self.transport.reset() self.com_port.reset_input_buffer() self.com_port.reset_output_buffer() # start interactive mode if not used as a module if __name__ == "__main__": # setup an argument parser parser = argparse.ArgumentParser() parser.add_argument("-c", "--com-port", dest="com_port", help="Com port to use.", required=True) parser.add_argument("-b", "--baudrate", dest="baudrate", help="Baudrate to use.", type=int, default=115200) parser.add_argument("-p", "--prompt", dest="prompt", help="Prompt text of the shellmatta instance.", default="->") parser.add_argument("-t", "--timeout", dest="timeout", help="Timeout on the com port.", type=float, default=0.1) parser.add_argument("-m", "--mandatory", dest="mandatory", help="Force transport layer usage.", default=True) args = parser.parse_args() # get our own shellmatta serial instance shellmatta = ShellmattaSerial(args.com_port, args.baudrate, args.prompt, args.timeout, args.mandatory) shellmatta.reset_communication() # process user input print("Shellmatta Transport Serial wrapper - type commands as you like:") while True: command = input() print(shellmatta.send_command(command))