shellmatta_transport_serial.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. """ Wrapper arount a shellmatta with transport layer to send commands and receive the response """
  2. import sys
  3. import serial
  4. import argparse
  5. from shellmatta_transport import ShellmattaTransport
  6. class ShellmattaSerial():
  7. """ Helper class to communicate with a shelmatta enabled device """
  8. def __init__(self, com_port, baudrate=115200, prompt="->", timeout=1, transport_layer_mandatory=False):
  9. """ create the transport layer instance using the passed com port """
  10. self.prompt = prompt
  11. self.com_port = serial.Serial(com_port, baudrate=baudrate, timeout=timeout)
  12. self.transport = ShellmattaTransport(self.com_port,
  13. transport_layer_mandatory,
  14. None)
  15. def send_command(self, command):
  16. """ Send command and wait for the prompt in the response. """
  17. send_string = command + "\r"
  18. self.transport.write(str.encode(send_string))
  19. data = b''
  20. while True:
  21. data += self.transport.read()
  22. data_string = data.decode()
  23. if data_string.endswith(self.prompt):
  24. # return received string without echo and prompt
  25. if data_string.startswith(send_string):
  26. data_string = data_string[len(send_string):-len(self.prompt)]
  27. return data_string
  28. def send_command_only(self, command):
  29. """ Send command without waiting for response. """
  30. send_string = command + "\r"
  31. self.transport.write(str.encode(send_string))
  32. def send_raw(self, data):
  33. """ Send a passed bytes string without any mercy. """
  34. self.transport.write_manual(data)
  35. def reset_communication(self):
  36. """ Clears all internal buffers, throwing away all data. """
  37. self.transport.reset()
  38. self.com_port.reset_input_buffer()
  39. self.com_port.reset_output_buffer()
  40. # start interactive mode if not used as a module
  41. if __name__ == "__main__":
  42. # setup an argument parser
  43. parser = argparse.ArgumentParser()
  44. parser.add_argument("-c", "--com-port", dest="com_port", help="Com port to use.", required=True)
  45. parser.add_argument("-b", "--baudrate", dest="baudrate", help="Baudrate to use.", type=int, default=115200)
  46. parser.add_argument("-p", "--prompt", dest="prompt", help="Prompt text of the shellmatta instance.", default="->")
  47. parser.add_argument("-t", "--timeout", dest="timeout", help="Timeout on the com port.", type=float, default=0.1)
  48. parser.add_argument("-m", "--mandatory", dest="mandatory", help="Force transport layer usage.", default=True)
  49. args = parser.parse_args()
  50. # get our own shellmatta serial instance
  51. shellmatta = ShellmattaSerial(args.com_port,
  52. args.baudrate,
  53. args.prompt,
  54. args.timeout,
  55. args.mandatory)
  56. shellmatta.reset_communication()
  57. # process user input
  58. print("Shellmatta Transport Serial wrapper - type commands as you like:")
  59. while True:
  60. command = input()
  61. print(shellmatta.send_command(command))