shellmatta_transport_serial.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. """ Wrapper arount a shellmatta with transport layer to send commands and receive the response """
  2. import time
  3. import serial
  4. import argparse
  5. from shellmatta_transport import ShellmattaTransport
  6. class ShellmattaSerial():
  7. """ Helper class to communicate with a shellmatta enabled device """
  8. def __init__(self, com_port, baudrate=115200, prompt="->", timeout=1, transport_layer_mandatory=False):
  9. """ create the transport layer instance using the passed com port """
  10. self.prompt = prompt
  11. self.com = com_port
  12. self.com_port = serial.Serial(com_port, baudrate=baudrate, timeout=timeout)
  13. self.transport = ShellmattaTransport(self.com_port, transport_layer_mandatory)
  14. def send_command(self, command, enter=True):
  15. """ Send command and wait for the prompt in the response. """
  16. if isinstance(command, str):
  17. send_data = str.encode(command)
  18. else:
  19. send_data = command
  20. retries = 3
  21. while retries:
  22. if enter:
  23. send_data += b"\r"
  24. self.transport.write(send_data)
  25. data = b''
  26. while True:
  27. data += self.transport.read()
  28. if data.endswith(str.encode(self.prompt)):
  29. # return received string without echo and prompt
  30. data = data[:-len(str.encode(self.prompt))]
  31. if data.startswith(send_data):
  32. data = data[len(send_data):]
  33. break
  34. # todo implement proper retries / sequence checking
  35. if b"crc error\r\n" not in data:
  36. break
  37. retries -= 1
  38. return data
  39. def send_command_only(self, command):
  40. """ Send command without waiting for response. """
  41. send_string = command + "\r"
  42. self.transport.write(str.encode(send_string))
  43. def send_raw(self, data):
  44. """ Send a passed bytes string without any mercy. """
  45. self.transport.write_manual(data)
  46. def reset_communication(self):
  47. """ Clears all internal buffers, throwing away all data. """
  48. self.transport.reset()
  49. self.com_port.flush()
  50. self.com_port.reset_input_buffer()
  51. self.com_port.reset_output_buffer()
  52. # send cancel - and read response from input buffer
  53. _result = self.send_command("\x03", enter=False)
  54. def close_serial(self):
  55. """ Closes the serial port """
  56. self.com_port.close()
  57. def set_baudrate(self, baud):
  58. """ Set the baudrate of the serial port """
  59. self.com_port.baudrate = baud
  60. self.com_port.close()
  61. time.sleep(0.5)
  62. self.com_port = serial.Serial(self.com, baudrate=baud, timeout=self.com_port.timeout)
  63. self.transport.close()
  64. self.transport = ShellmattaTransport(self.com_port,
  65. False,
  66. None)
  67. # start interactive mode if not used as a module
  68. if __name__ == "__main__":
  69. # setup an argument parser
  70. parser = argparse.ArgumentParser()
  71. parser.add_argument("-c", "--com-port", dest="com_port", help="Com port to use.", required=True)
  72. parser.add_argument("-b", "--baudrate", dest="baudrate", help="Baudrate to use.", type=int, default=115200)
  73. parser.add_argument("-p", "--prompt", dest="prompt", help="Prompt text of the shellmatta instance.", default="->")
  74. parser.add_argument("-t", "--timeout", dest="timeout", help="Timeout on the com port.", type=float, default=0.1)
  75. parser.add_argument("-m", "--mandatory", dest="mandatory", help="Force transport layer usage.", default=True)
  76. args = parser.parse_args()
  77. # get our own shellmatta serial instance
  78. shellmatta = ShellmattaSerial(args.com_port,
  79. args.baudrate,
  80. args.prompt,
  81. args.timeout,
  82. args.mandatory)
  83. shellmatta.reset_communication()
  84. # process user input
  85. print("Shellmatta Transport Serial wrapper - type commands as you like:")
  86. while True:
  87. command = input()
  88. print(shellmatta.send_command(command))