diff --git a/docs/api/pylabrobot.capabilities.rst b/docs/api/pylabrobot.capabilities.rst index 666ce292e20..91b0ea9ef65 100644 --- a/docs/api/pylabrobot.capabilities.rst +++ b/docs/api/pylabrobot.capabilities.rst @@ -191,6 +191,17 @@ Barcode Scanning BarcodeScanner BarcodeScannerBackend +.. currentmodule:: pylabrobot.capabilities.barcode_scanning.serial + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + SerialBarcodeScanner + SerialBarcodeScannerDriver + SerialBarcodeScannerBackend + Microscopy ---------- diff --git a/docs/user_guide/capabilities/barcode-scanning.ipynb b/docs/user_guide/capabilities/barcode-scanning.ipynb index cdba713d33a..792c974f1af 100644 --- a/docs/user_guide/capabilities/barcode-scanning.ipynb +++ b/docs/user_guide/capabilities/barcode-scanning.ipynb @@ -38,6 +38,29 @@ "execution_count": null, "outputs": [] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Serial scanners\n", + "\n", + "Scanners configured as RS-232 or USB virtual COM devices can use {class}`~pylabrobot.capabilities.barcode_scanning.serial.SerialBarcodeScanner`. Configure the scanner to emit a line ending such as carriage return or newline after each barcode. If the scanner documents a serial trigger command, pass it as `trigger_command`; otherwise `scan()` passively waits for the next barcode line." + ] + }, + { + "cell_type": "code", + "metadata": {}, + "source": [ + "from pylabrobot.capabilities.barcode_scanning import SerialBarcodeScanner\n", + "\n", + "serial_scanner = SerialBarcodeScanner(port=\"COM5\", baudrate=115200)\n", + "await serial_scanner.setup()\n", + "barcode = await serial_scanner.barcode_scanning.scan(read_time=5)\n", + "await serial_scanner.stop()" + ], + "execution_count": null, + "outputs": [] + }, { "cell_type": "markdown", "metadata": {}, @@ -66,4 +89,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/pylabrobot/capabilities/barcode_scanning/__init__.py b/pylabrobot/capabilities/barcode_scanning/__init__.py index 498af9e43da..036d67a8b7c 100644 --- a/pylabrobot/capabilities/barcode_scanning/__init__.py +++ b/pylabrobot/capabilities/barcode_scanning/__init__.py @@ -1,2 +1,3 @@ from .backend import BarcodeScannerBackend, BarcodeScannerError from .barcode_scanning import BarcodeScanner +from .serial import SerialBarcodeScanner, SerialBarcodeScannerBackend, SerialBarcodeScannerDriver diff --git a/pylabrobot/capabilities/barcode_scanning/serial.py b/pylabrobot/capabilities/barcode_scanning/serial.py new file mode 100644 index 00000000000..fe2db56a11e --- /dev/null +++ b/pylabrobot/capabilities/barcode_scanning/serial.py @@ -0,0 +1,221 @@ +import asyncio +import logging +from typing import Optional, Sequence + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.device import Device, Driver +from pylabrobot.io.serial import Serial +from pylabrobot.resources.barcode import Barcode, BarcodePosition + +from .backend import BarcodeScannerBackend +from .barcode_scanning import BarcodeScanner + +logger = logging.getLogger(__name__) + + +class SerialBarcodeScannerDriver(Driver): + """Line-oriented serial driver for barcode scanners. + + This driver is intended for scanners configured as RS-232 or USB virtual COM + devices. It reads bytes from :class:`pylabrobot.io.Serial` until a configured + line terminator is seen. + """ + + def __init__( + self, + port: Optional[str] = None, + vid: Optional[int] = None, + pid: Optional[int] = None, + baudrate: int = 9600, + bytesize: int = 8, + parity: str = "N", + stopbits: int = 1, + write_timeout: float = 1, + read_timeout: float = 1, + rtscts: bool = False, + dsrdtr: bool = False, + xonxoff: bool = False, + encoding: str = "utf-8", + terminators: Sequence[bytes] = (b"\r", b"\n"), + max_line_length: int = 4096, + ): + super().__init__() + if len(terminators) == 0: + raise ValueError("At least one line terminator must be configured.") + if any(len(t) != 1 for t in terminators): + raise ValueError("SerialBarcodeScannerDriver only supports one-byte terminators.") + if max_line_length <= 0: + raise ValueError("max_line_length must be positive.") + + self.io = Serial( + human_readable_device_name="Serial Barcode Scanner", + port=port, + vid=vid, + pid=pid, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits, + write_timeout=write_timeout, + timeout=read_timeout, + rtscts=rtscts, + dsrdtr=dsrdtr, + xonxoff=xonxoff, + ) + self.encoding = encoding + self.terminators = tuple(terminators) + self.max_line_length = max_line_length + + async def setup(self, backend_params: Optional[BackendParams] = None): + del backend_params + await self.io.setup() + logger.info("[Serial barcode scanner %s] connected", self.io.port) + + async def stop(self): + await self.io.stop() + logger.info("[Serial barcode scanner %s] disconnected", self.io.port) + + async def read_line(self, timeout: Optional[float] = None) -> str: + """Read one barcode line from the serial stream. + + Args: + timeout: Optional total read timeout in seconds. If omitted, the + underlying :class:`pylabrobot.io.Serial` timeout is used. + + Returns: + The decoded line without the trailing line terminator. Returns an empty + string if the timeout elapses before any byte is read. + """ + raw = await self._read_until_terminator(timeout=timeout) + while any(raw.endswith(terminator) for terminator in self.terminators): + raw = raw[:-1] + return raw.decode(self.encoding, errors="replace") + + async def write(self, data: bytes): + """Write raw bytes to the scanner.""" + await self.io.write(data) + + async def _read_until_terminator(self, timeout: Optional[float]) -> bytes: + loop = asyncio.get_running_loop() + deadline = None if timeout is None else loop.time() + timeout + buf = bytearray() + + while len(buf) < self.max_line_length: + if deadline is None: + chunk = await self.io.read(1) + else: + remaining = deadline - loop.time() + if remaining <= 0: + break + with self.io.temporary_timeout(remaining): + chunk = await self.io.read(1) + + if len(chunk) == 0: + break + buf.extend(chunk) + if bytes(chunk) in self.terminators: + break + + return bytes(buf) + + async def reset_input_buffer(self): + """Clear unread bytes buffered by the serial transport.""" + await self.io.reset_input_buffer() + + +class SerialBarcodeScannerBackend(BarcodeScannerBackend): + """Barcode-scanning backend for line-oriented serial scanners.""" + + def __init__( + self, + driver: SerialBarcodeScannerDriver, + symbology: str = "unknown", + position_on_resource: BarcodePosition = "front", + trigger_command: Optional[bytes] = None, + untrigger_command: Optional[bytes] = None, + ): + super().__init__() + self.driver = driver + self.symbology = symbology + self.position_on_resource = position_on_resource + self.trigger_command = trigger_command + self.untrigger_command = untrigger_command + + async def scan_barcode(self, read_time: Optional[float] = None) -> Optional[Barcode]: + if read_time is not None and read_time < 0: + raise ValueError("read_time must be non-negative.") + + if self.trigger_command is not None: + await self.driver.write(self.trigger_command) + + try: + data = await self.driver.read_line(timeout=read_time) + finally: + if self.untrigger_command is not None: + await self.driver.write(self.untrigger_command) + + if data == "": + return None + + logger.info("[Serial barcode scanner %s] scanned barcode: %s", self.driver.io.port, data) + return Barcode( + data=data, + symbology=self.symbology, + position_on_resource=self.position_on_resource, + ) + + +class SerialBarcodeScanner(Device): + """Barcode scanner connected through RS-232 or USB virtual COM.""" + + def __init__( + self, + port: Optional[str] = None, + vid: Optional[int] = None, + pid: Optional[int] = None, + baudrate: int = 9600, + bytesize: int = 8, + parity: str = "N", + stopbits: int = 1, + write_timeout: float = 1, + read_timeout: float = 1, + rtscts: bool = False, + dsrdtr: bool = False, + xonxoff: bool = False, + encoding: str = "utf-8", + terminators: Sequence[bytes] = (b"\r", b"\n"), + max_line_length: int = 4096, + symbology: str = "unknown", + position_on_resource: BarcodePosition = "front", + trigger_command: Optional[bytes] = None, + untrigger_command: Optional[bytes] = None, + ): + driver = SerialBarcodeScannerDriver( + port=port, + vid=vid, + pid=pid, + baudrate=baudrate, + bytesize=bytesize, + parity=parity, + stopbits=stopbits, + write_timeout=write_timeout, + read_timeout=read_timeout, + rtscts=rtscts, + dsrdtr=dsrdtr, + xonxoff=xonxoff, + encoding=encoding, + terminators=terminators, + max_line_length=max_line_length, + ) + super().__init__(driver=driver) + self.driver: SerialBarcodeScannerDriver = driver + self.barcode_scanning = BarcodeScanner( + backend=SerialBarcodeScannerBackend( + driver=driver, + symbology=symbology, + position_on_resource=position_on_resource, + trigger_command=trigger_command, + untrigger_command=untrigger_command, + ) + ) + self._capabilities = [self.barcode_scanning] diff --git a/pylabrobot/capabilities/barcode_scanning/serial_tests.py b/pylabrobot/capabilities/barcode_scanning/serial_tests.py new file mode 100644 index 00000000000..529c045e08c --- /dev/null +++ b/pylabrobot/capabilities/barcode_scanning/serial_tests.py @@ -0,0 +1,160 @@ +import unittest +from typing import List + +from pylabrobot.capabilities.barcode_scanning.serial import ( + SerialBarcodeScanner, + SerialBarcodeScannerBackend, + SerialBarcodeScannerDriver, +) + + +class FakeSerialIO: + def __init__(self, chunks: List[bytes]): + self.chunks = chunks + self.writes: List[bytes] = [] + self.port = "COM_TEST" + self.timeout: float = 1 + self.setup_called = False + self.stop_called = False + self.reset_input_buffer_called = False + + async def setup(self): + self.setup_called = True + + async def stop(self): + self.stop_called = True + + async def read(self, num_bytes: int = 1) -> bytes: + del num_bytes + if len(self.chunks) == 0: + return b"" + return self.chunks.pop(0) + + async def write(self, data: bytes): + self.writes.append(data) + + async def reset_input_buffer(self): + self.reset_input_buffer_called = True + + def get_read_timeout(self) -> float: + return self.timeout + + def set_read_timeout(self, timeout: float) -> None: + self.timeout = timeout + + def temporary_timeout(self, timeout: float): + fake = self + + class TemporaryTimeout: + def __enter__(self): + self.original_timeout = fake.timeout + fake.timeout = timeout + + def __exit__(self, exc_type, exc_value, traceback): + fake.timeout = self.original_timeout + + return TemporaryTimeout() + + +def make_driver(chunks: List[bytes]) -> SerialBarcodeScannerDriver: + driver = SerialBarcodeScannerDriver(port="COM_TEST") + driver.io = FakeSerialIO(chunks) # type: ignore[assignment] + return driver + + +class TestSerialBarcodeScannerDriver(unittest.IsolatedAsyncioTestCase): + async def test_read_line_carriage_return(self): + driver = make_driver([b"1", b"2", b"3", b"\r"]) + + self.assertEqual(await driver.read_line(timeout=1), "123") + + async def test_read_line_newline(self): + driver = make_driver([b"A", b"B", b"C", b"\n"]) + + self.assertEqual(await driver.read_line(timeout=1), "ABC") + + async def test_read_line_timeout_before_data(self): + driver = make_driver([]) + + self.assertEqual(await driver.read_line(timeout=0), "") + + async def test_reset_input_buffer(self): + driver = make_driver([]) + + await driver.reset_input_buffer() + + fake_io = driver.io + assert isinstance(fake_io, FakeSerialIO) + self.assertTrue(fake_io.reset_input_buffer_called) + + def test_rejects_empty_terminators(self): + with self.assertRaises(ValueError): + SerialBarcodeScannerDriver(port="COM_TEST", terminators=[]) + + def test_rejects_multi_byte_terminators(self): + with self.assertRaises(ValueError): + SerialBarcodeScannerDriver(port="COM_TEST", terminators=[b"\r\n"]) + + +class TestSerialBarcodeScannerBackend(unittest.IsolatedAsyncioTestCase): + async def test_scan_barcode(self): + driver = make_driver([b"2", b"2", b"6", b"\r"]) + backend = SerialBarcodeScannerBackend( + driver=driver, symbology="Code 128 (Subset B and C)", position_on_resource="right" + ) + + barcode = await backend.scan_barcode(read_time=1) + + assert barcode is not None + self.assertEqual(barcode.data, "226") + self.assertEqual(barcode.symbology, "Code 128 (Subset B and C)") + self.assertEqual(barcode.position_on_resource, "right") + + async def test_scan_barcode_returns_none_on_timeout(self): + driver = make_driver([]) + backend = SerialBarcodeScannerBackend(driver=driver) + + self.assertIsNone(await backend.scan_barcode(read_time=0)) + + async def test_scan_barcode_with_trigger_command(self): + driver = make_driver([b"1", b"2", b"3", b"\r"]) + backend = SerialBarcodeScannerBackend( + driver=driver, + trigger_command=b"TRIGGER\r", + untrigger_command=b"UNTRIGGER\r", + ) + + barcode = await backend.scan_barcode(read_time=1) + + assert barcode is not None + self.assertEqual(barcode.data, "123") + fake_io = driver.io + assert isinstance(fake_io, FakeSerialIO) + self.assertEqual(fake_io.writes, [b"TRIGGER\r", b"UNTRIGGER\r"]) + + async def test_scan_barcode_rejects_negative_read_time(self): + driver = make_driver([]) + backend = SerialBarcodeScannerBackend(driver=driver) + + with self.assertRaises(ValueError): + await backend.scan_barcode(read_time=-1) + + +class TestSerialBarcodeScannerDevice(unittest.IsolatedAsyncioTestCase): + async def test_device_setup_scan_stop(self): + scanner = SerialBarcodeScanner(port="COM_TEST") + fake_io = FakeSerialIO([b"X", b"Y", b"Z", b"\r"]) + scanner.driver.io = fake_io # type: ignore[assignment] + + await scanner.setup() + barcode = await scanner.barcode_scanning.scan(read_time=1) + await scanner.stop() + + assert barcode is not None + self.assertEqual(barcode.data, "XYZ") + self.assertTrue(fake_io.setup_called) + self.assertTrue(fake_io.stop_called) + + +if __name__ == "__main__": + unittest.main()