""" Wrapper arount a shellmatta with transport layer to send commands and receive the response """ import time import serial import argparse from shellmatta_transport import ShellmattaTransport from robot.api import logger class ShellmattaSerial(): """ Helper class to communicate with a shelmatta enabled device """ def __init__(self, com_port, baudrate=115200, prompt="->", timeout=1, transport_layer_mandatory=False): """ create the transport layer instance using the passed com port """ self.prompt = prompt self.com = com_port self.com_port = serial.Serial(com_port, baudrate=baudrate, timeout=timeout) self.transport = ShellmattaTransport(self.com_port, transport_layer_mandatory, None) def send_command(self, command): """ Send command and wait for the prompt in the response. """ if isinstance(command, str): send_data = str.encode(command) else: send_data = command retries = 3 while retries: send_data = send_data + b"\r" self.transport.write(send_data) data = b'' while True: data += self.transport.read() if data.endswith(str.encode(self.prompt)): # return received string without echo and prompt data = data[:-len(str.encode(self.prompt))] if data.startswith(send_data): data = data[len(send_data):] break # todo implement proper retries / sequence checking if b"crc error\r\n" not in data: break retries -= 1 return data def send_command_only(self, command): """ Send command without waiting for response. """ send_string = command + "\r" self.transport.write(str.encode(send_string)) def send_raw(self, data): """ Send a passed bytes string without any mercy. """ self.transport.write_manual(data) def reset_communication(self): """ Clears all internal buffers, throwing away all data. """ self.transport.reset() self.com_port.flush() # time.sleep(0.015) self.com_port.reset_input_buffer() self.com_port.reset_output_buffer() def close_serial(self): self.com_port.close() def set_baudrate(self, baud): self.com_port.baudrate = baud self.com_port.close() time.sleep(0.5) self.com_port = serial.Serial(self.com, baudrate=baud, timeout=self.com_port.timeout) self.transport.close() self.transport = ShellmattaTransport(self.com_port, False, None) # self.com_port._reconfigure_port # start interactive mode if not used as a module if __name__ == "__main__": # setup an argument parser parser = argparse.ArgumentParser() parser.add_argument("-c", "--com-port", dest="com_port", help="Com port to use.", required=True) parser.add_argument("-b", "--baudrate", dest="baudrate", help="Baudrate to use.", type=int, default=115200) parser.add_argument("-p", "--prompt", dest="prompt", help="Prompt text of the shellmatta instance.", default="->") parser.add_argument("-t", "--timeout", dest="timeout", help="Timeout on the com port.", type=float, default=0.1) parser.add_argument("-m", "--mandatory", dest="mandatory", help="Force transport layer usage.", default=True) args = parser.parse_args() # get our own shellmatta serial instance shellmatta = ShellmattaSerial(args.com_port, args.baudrate, args.prompt, args.timeout, args.mandatory) shellmatta.reset_communication() # process user input print("Shellmatta Transport Serial wrapper - type commands as you like:") while True: command = input() print(shellmatta.send_command(command))