Source code for usb_iss.serial_

from datetime import datetime, timedelta

from .exceptions import UsbIssError
from . import defs


[docs]class Serial(object): """ Use the USB_ISS device to perform Serial UART accesses. Example: :: from usb_iss import UsbIss # Configure Serial mode iss = UsbIss() iss.open("COM3") iss.setup_serial() # Write and read some data iss.serial.transmit([0x48, 0x65, 0x6c, 0x6c, 0x6f]); iss.serial.transmit_string("Hello!") data = iss.serial.receive() print(data) # [72, 105] """ def __init__(self, drv): self._drv = drv self._rx_buffer = []
[docs] def transmit(self, data): """ Transmit data over the Serial UART interface. Args: data (list of int): List of bytes to transmit. """ self._transaction(data)
[docs] def transmit_string(self, string, encoding="utf-8"): """ Transmit a string over the Serial UART interface. Args: string (str): String to transmit. encoding (str): Encoding of the string. """ data = list(bytearray(string.encode(encoding))) self.transmit(data)
[docs] def receive(self, timeout_ms=100): """ Receive data over the Serial UART interface. Returns once no data is received for timeout_ms. Args: timeout_ms (int): Returns once no data is received for this period. Returns: list of int: List of bytes received. """ data = [] last_rx_time = datetime.now() while True: rx_count = self.get_rx_count() if rx_count > 0: data += self._rx_buffer self._rx_buffer = [] last_rx_time = datetime.now() deadline = last_rx_time + timedelta(milliseconds=timeout_ms) if datetime.now() > deadline: return data
[docs] def receive_string(self, timeout_ms=100, encoding="utf-8"): """ Receive a string over the Serial UART interface. Returns once no data is received for timeout_ms. Args: timeout_ms (int): Returns once no data is received for this period. encoding (str): Encoding of the string. Returns: string: String received. """ return bytearray(self.receive(timeout_ms)).decode(encoding)
[docs] def get_rx_count(self): """ Return the number of bytes in the receive buffer. Returns: int: Number of bytes in the receive buffer. """ self._transaction() return len(self._rx_buffer)
[docs] def get_tx_count(self): """ Return the number of bytes in the transmit buffer. Returns: int: Number of bytes in the transmit buffer. """ return self._transaction()
def _transaction(self, data=None): if data is None: data = [] self._drv.write_cmd(defs.Command.SERIAL.value, data) [code, tx_count, rx_count] = self._drv.read(3) if code == defs.ResponseCode.NACK.value: raise UsbIssError("NACK received - transmit buffer overflow") if rx_count > 0: self._rx_buffer += self._drv.read(rx_count) return tx_count