123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- """ Wrapper arount a shellmatta with transport layer to send commands and receive the response """
- import time
- import serial
- import argparse
- from shellmatta_transport import ShellmattaTransport
- from robot.api import logger
- class ShellmattaSerial():
- """ Helper class to communicate with a shelmatta enabled device """
- def __init__(self, com_port, baudrate=115200, prompt="->", timeout=1, transport_layer_mandatory=False):
- """ create the transport layer instance using the passed com port """
- self.prompt = prompt
- self.com = com_port
- self.com_port = serial.Serial(com_port, baudrate=baudrate, timeout=timeout)
- self.transport = ShellmattaTransport(self.com_port,
- transport_layer_mandatory,
- None)
- def send_command(self, command):
- """ Send command and wait for the prompt in the response. """
- if isinstance(command, str):
- send_data = str.encode(command)
- else:
- send_data = command
- retries = 3
- while retries:
- send_data = send_data + b"\r"
- self.transport.write(send_data)
- data = b''
- while True:
- data += self.transport.read()
- if data.endswith(str.encode(self.prompt)):
- # return received string without echo and prompt
- data = data[:-len(str.encode(self.prompt))]
- if data.startswith(send_data):
- data = data[len(send_data):]
- break
- # todo implement proper retries / sequence checking
- if b"crc error\r\n" not in data:
- break
- retries -= 1
- return data
- def send_command_only(self, command):
- """ Send command without waiting for response. """
- send_string = command + "\r"
- self.transport.write(str.encode(send_string))
- def send_raw(self, data):
- """ Send a passed bytes string without any mercy. """
- self.transport.write_manual(data)
- def reset_communication(self):
- """ Clears all internal buffers, throwing away all data. """
- self.transport.reset()
- self.com_port.flush()
- # time.sleep(0.015)
- self.com_port.reset_input_buffer()
- self.com_port.reset_output_buffer()
- def close_serial(self):
- self.com_port.close()
- def set_baudrate(self, baud):
- self.com_port.baudrate = baud
- self.com_port.close()
- time.sleep(0.5)
- self.com_port = serial.Serial(self.com, baudrate=baud, timeout=self.com_port.timeout)
- self.transport.close()
- self.transport = ShellmattaTransport(self.com_port,
- False,
- None)
- # self.com_port._reconfigure_port
- # start interactive mode if not used as a module
- if __name__ == "__main__":
- # setup an argument parser
- parser = argparse.ArgumentParser()
- parser.add_argument("-c", "--com-port", dest="com_port", help="Com port to use.", required=True)
- parser.add_argument("-b", "--baudrate", dest="baudrate", help="Baudrate to use.", type=int, default=115200)
- parser.add_argument("-p", "--prompt", dest="prompt", help="Prompt text of the shellmatta instance.", default="->")
- parser.add_argument("-t", "--timeout", dest="timeout", help="Timeout on the com port.", type=float, default=0.1)
- parser.add_argument("-m", "--mandatory", dest="mandatory", help="Force transport layer usage.", default=True)
- args = parser.parse_args()
- # get our own shellmatta serial instance
- shellmatta = ShellmattaSerial(args.com_port,
- args.baudrate,
- args.prompt,
- args.timeout,
- args.mandatory)
- shellmatta.reset_communication()
- # process user input
- print("Shellmatta Transport Serial wrapper - type commands as you like:")
- while True:
- command = input()
- print(shellmatta.send_command(command))
|