diff --git a/docs/api/pylabrobot.capabilities.rst b/docs/api/pylabrobot.capabilities.rst index 666ce292e20..3fc66738bac 100644 --- a/docs/api/pylabrobot.capabilities.rst +++ b/docs/api/pylabrobot.capabilities.rst @@ -192,6 +192,42 @@ Barcode Scanning BarcodeScannerBackend +Rack Reading +------------ + +.. currentmodule:: pylabrobot.capabilities.rack_reading.rack_reader + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + RackReader + +.. currentmodule:: pylabrobot.capabilities.rack_reading.backend + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + RackReaderBackend + +.. currentmodule:: pylabrobot.capabilities.rack_reading.standard + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + RackReaderState + RackReaderError + RackReaderTimeoutError + RackScanEntry + RackScanResult + LayoutInfo + + Microscopy ---------- diff --git a/docs/api/pylabrobot.micronic.rst b/docs/api/pylabrobot.micronic.rst new file mode 100644 index 00000000000..744042b4ce1 --- /dev/null +++ b/docs/api/pylabrobot.micronic.rst @@ -0,0 +1,46 @@ +.. currentmodule:: pylabrobot.micronic + +pylabrobot.micronic package +=========================== + +Micronic integrations built on the rack-reading capability. + +Device +------ + +.. currentmodule:: pylabrobot.micronic.code_reader.code_reader + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + MicronicCodeReader + + +Driver +------ + +.. currentmodule:: pylabrobot.micronic.code_reader.driver + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + MicronicDriver + MicronicError + + +Capabilities +------------ + +.. currentmodule:: pylabrobot.micronic.code_reader.rack_reading_backend + +.. autosummary:: + :toctree: _autosummary + :nosignatures: + :recursive: + + MicronicRackReadingBackend + MicronicRackReaderError diff --git a/docs/api/pylabrobot.rst b/docs/api/pylabrobot.rst index 53a04635937..5f46dd6be3f 100644 --- a/docs/api/pylabrobot.rst +++ b/docs/api/pylabrobot.rst @@ -41,6 +41,7 @@ Manufacturers pylabrobot.inheco pylabrobot.liconic pylabrobot.mettler_toledo + pylabrobot.micronic pylabrobot.molecular_devices pylabrobot.opentrons pylabrobot.qinstruments diff --git a/docs/user_guide/capabilities/index.md b/docs/user_guide/capabilities/index.md index 2a091c83f64..fb4e0f7ddad 100644 --- a/docs/user_guide/capabilities/index.md +++ b/docs/user_guide/capabilities/index.md @@ -55,6 +55,7 @@ loading-tray pumping weighing barcode-scanning +rack-reading microscopy automated-retrieval absorbance diff --git a/docs/user_guide/capabilities/rack-reading.md b/docs/user_guide/capabilities/rack-reading.md new file mode 100644 index 00000000000..a8e163a7ade --- /dev/null +++ b/docs/user_guide/capabilities/rack-reading.md @@ -0,0 +1,55 @@ +# Rack Reading + +The `rack_reading` capability standardizes rack-scale code readers that trigger a rack scan, +report normalized state while scanning, and return structured per-position scan results. + +Unlike one-at-a-time code reads, rack reading is job-oriented and returns the full decoded rack map. + +## Public API + +```python +from pylabrobot.capabilities.rack_reading import RackReader + +result = await reader.scan_rack(timeout=60.0, poll_interval=1.0) +``` + +`scan_rack()` is the main public operation. It triggers the scan, waits internally until the +reader reaches `dataready`, and then returns a `RackScanResult`. + +If the hardware supports reading just the rack barcode without decoding all tube positions, +`scan_rack_id()` exposes that as a rack-reading operation and returns the rack identifier only. + +Lower-level methods are also available: + +- `get_state()` +- `wait_for_data_ready()` +- `trigger_rack_scan()` +- `scan_rack_id()` +- `get_scan_result()` +- `get_rack_id()` +- `get_layouts()` +- `get_current_layout()` +- `set_current_layout(layout)` + +## Example With Micronic + +```python +from pylabrobot.micronic import MicronicCodeReader + +reader = MicronicCodeReader( + scanner_backend="sane", + sane_device="avision:libusb:001:004", + serial_port="/dev/ttyUSB0", +) +await reader.setup() + +try: + result = await reader.rack_reading.scan_rack(timeout=90.0, poll_interval=1.0) + print(result.rack_id) + print(result.entries[0].position, result.entries[0].tube_id) + + rack_id = await reader.rack_reading.scan_rack_id(timeout=60.0, poll_interval=1.0) + print(rack_id) +finally: + await reader.stop() +``` diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md index bb273f350cf..3f1ad03ea17 100644 --- a/docs/user_guide/index.md +++ b/docs/user_guide/index.md @@ -40,6 +40,7 @@ inheco/index liconic/index mettler_toledo/index molecular_devices/index +micronic/index opentrons/index qinstruments/index thermo_fisher/index diff --git a/docs/user_guide/machines.md b/docs/user_guide/machines.md index dc057d79b15..db5a5fbcebb 100644 --- a/docs/user_guide/machines.md +++ b/docs/user_guide/machines.md @@ -188,6 +188,12 @@ tr > td:nth-child(5) { width: 15%; } |--------------|---------|-------------|--------| | Mettler Toledo | WXS205SDU | Full | [PLR](02_analytical/scales/mettler-toledo-WXS205SDU.ipynb) / [OEM](https://www.mt.com/us/en/home/products/Industrial_Weighing_Solutions/high-precision-weigh-sensors/weigh-module-wxs205sdu-15-11121008.html) | +### Rack Readers + +| Manufacturer | Machine | Features | PLR-Support | Links | +|--------------|---------|----------|-------------|--------| +| Micronic | Direct local scanner + serial control | rack reading | Basics | [PLR](https://docs.pylabrobot.org/user_guide/micronic/index.html) / [OEM](https://www.micronic.com/products/code-reader/) | + --- ## Understanding the Tables diff --git a/docs/user_guide/micronic/index.md b/docs/user_guide/micronic/index.md new file mode 100644 index 00000000000..d717de7348b --- /dev/null +++ b/docs/user_guide/micronic/index.md @@ -0,0 +1,85 @@ +# Micronic + +PyLabRobot includes `v1b1` Micronic integrations built on the generic +`rack_reading` capability. + +`MicronicCodeReader` controls the local hardware directly. It acquires the rack +image through a configured scanner command, a Windows TWAIN helper available on +PATH, or Ubuntu/Linux SANE `scanimage`; reads the side rack barcode through the +serial reader; decodes tube DataMatrix codes locally; and returns a standard +`RackScanResult`. It does not call Micronic Code Reader or IO Monitor, and +PyLabRobot does not package any scanner helper executable. + +## Supported operations + +Rack reading (large scanner that decodes 96 tubes plus the side rack barcode): + +- `rack_reading.scan_rack()` to trigger image acquisition, decode all 96 tube + positions, read the side rack barcode, and return a `RackScanResult` +- `rack_reading.scan_rack_id()` for a rack-barcode-only read on the side reader +- `rack_reading.get_layouts()`, `get_current_layout()`, and + `set_current_layout()` for the fixed 8x12 rack layout + +## Hardware example + +The operator is responsible for installing any OS-level scanner bridge +(`twain_scan`, `scanimage`, or a custom command), the PLR serial extra +(`pylabrobot[serial]`), and the local Python decode dependencies in the runtime +environment. + +```python +from pylabrobot.micronic import MicronicCodeReader + +reader = MicronicCodeReader( + scanner_backend="twain", + twain_scanner_path=r"C:\Tools\twain_scan.exe", + twain_source="AVA6PlusG", + image_dir=r"C:\ProgramData\PyLabRobot\micronic-images", + serial_port="COM4", + keep_images=True, +) +await reader.setup() + +try: + rack_result = await reader.rack_reading.scan_rack(timeout=90.0, poll_interval=1.0) + print(rack_result.rack_id) + print(len([entry for entry in rack_result.entries if entry.tube_id])) + + rack_id = await reader.rack_reading.scan_rack_id(timeout=5.0, poll_interval=0.5) + print(rack_id) +finally: + await reader.stop() +``` + +On Ubuntu/Linux, use SANE if the scanner is exposed by a SANE backend: + +```python +reader = MicronicCodeReader( + scanner_backend="sane", + sane_device="avision:libusb:001:004", + serial_port="/dev/ttyUSB0", + image_extension="tiff", +) +``` + +For any other acquisition stack, pass `scan_command`. Each command argument is +formatted with `{output_path}`, `{timeout_ms}`, `{twain_source}`, and +`{sane_device}` before execution. The command must write the rack image to +`{output_path}`. + +## Notes + +- `scan_rack` reads every tube barcode and finishes by reading the rack ID, so + it typically takes tens of seconds. `scan_rack_id` only reads the rack + barcode and completes in a few seconds. +- TWAIN is a Windows scanner-driver API. PyLabRobot does not ship a TWAIN + bridge binary and does not install one for you; configure + `twain_scanner_path`, set `MICRONIC_TWAIN_SCANNER_PATH`, or put a local helper + named `twain_scan`/`twain_scan.exe` on PATH when using the `twain` backend. +- Ubuntu/Linux scanner control should use SANE `scanimage` or a custom + `scan_command`. PyLabRobot does not install SANE or vendor scanner drivers. + Rack-ID reads use `pylabrobot.io.Serial`, which is installed through the + `pylabrobot[serial]` extra. +- Image decoding imports `pillow`, `opencv-python-headless`, `numpy`, and + `zxing-cpp` at runtime. Install them in the environment that runs PyLabRobot. +- Use `image_input` for offline decode checks without touching scanner hardware. diff --git a/pylabrobot/capabilities/__init__.py b/pylabrobot/capabilities/__init__.py index ddf84ebd973..d39489932c5 100644 --- a/pylabrobot/capabilities/__init__.py +++ b/pylabrobot/capabilities/__init__.py @@ -1 +1,11 @@ from .capability import Capability, CapabilityBackend, need_capability_ready +from .rack_reading import ( + LayoutInfo, + RackReader, + RackReaderBackend, + RackReaderError, + RackReaderState, + RackReaderTimeoutError, + RackScanEntry, + RackScanResult, +) diff --git a/pylabrobot/capabilities/rack_reading/__init__.py b/pylabrobot/capabilities/rack_reading/__init__.py new file mode 100644 index 00000000000..f0d7a8d1eb3 --- /dev/null +++ b/pylabrobot/capabilities/rack_reading/__init__.py @@ -0,0 +1,11 @@ +from .backend import RackReaderBackend +from .chatterbox import RackReaderChatterboxBackend +from .rack_reader import RackReader +from .standard import ( + LayoutInfo, + RackReaderError, + RackReaderState, + RackReaderTimeoutError, + RackScanEntry, + RackScanResult, +) diff --git a/pylabrobot/capabilities/rack_reading/backend.py b/pylabrobot/capabilities/rack_reading/backend.py new file mode 100644 index 00000000000..e65c2725409 --- /dev/null +++ b/pylabrobot/capabilities/rack_reading/backend.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod + +from pylabrobot.capabilities.capability import CapabilityBackend + +from .standard import LayoutInfo, RackReaderState, RackScanResult + + +class RackReaderBackend(CapabilityBackend, metaclass=ABCMeta): + """Abstract backend for rack readers that decode position-indexed rack contents.""" + + @abstractmethod + async def get_state(self) -> RackReaderState: + """Return the current rack reader state.""" + + @abstractmethod + async def trigger_rack_scan(self) -> None: + """Initiate a rack-wide scan.""" + + @abstractmethod + async def scan_rack_id(self, timeout: float, poll_interval: float) -> str: + """Perform a rack-barcode-only scan and return the rack identifier. + + Backends whose hardware exposes a one-shot rack-id read may ignore + ``timeout`` and ``poll_interval``; backends that need a trigger/poll cycle + should respect them. + """ + + @abstractmethod + async def get_scan_result(self) -> RackScanResult: + """Return the most recent rack scan result.""" + + @abstractmethod + async def get_rack_id(self) -> str: + """Return the rack identifier reported by the scanner.""" + + @abstractmethod + async def get_layouts(self) -> list[LayoutInfo]: + """Return supported layouts.""" + + @abstractmethod + async def get_current_layout(self) -> str: + """Return the active layout.""" + + @abstractmethod + async def set_current_layout(self, layout: str) -> None: + """Set the active layout.""" diff --git a/pylabrobot/capabilities/rack_reading/chatterbox.py b/pylabrobot/capabilities/rack_reading/chatterbox.py new file mode 100644 index 00000000000..ac3276ca6ea --- /dev/null +++ b/pylabrobot/capabilities/rack_reading/chatterbox.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from .backend import RackReaderBackend +from .standard import LayoutInfo, RackReaderState, RackScanEntry, RackScanResult + + +class RackReaderChatterboxBackend(RackReaderBackend): + """Device-free rack-reading backend for tests and examples.""" + + def __init__(self): + self._state = RackReaderState.IDLE + self._layout = "96" + + async def get_state(self) -> RackReaderState: + return self._state + + async def trigger_rack_scan(self) -> None: + self._state = RackReaderState.DATAREADY + + async def scan_rack_id(self, timeout: float, poll_interval: float) -> str: + self._state = RackReaderState.DATAREADY + return "CHATTERBOX" + + async def get_scan_result(self) -> RackScanResult: + return RackScanResult( + rack_id="CHATTERBOX", + date="19700101", + time="000000", + entries=[ + RackScanEntry(position="A01", tube_id="SIMULATED", status="Code OK"), + ], + ) + + async def get_rack_id(self) -> str: + return "CHATTERBOX" + + async def get_layouts(self) -> list[LayoutInfo]: + return [LayoutInfo(name="96"), LayoutInfo(name="48")] + + async def get_current_layout(self) -> str: + return self._layout + + async def set_current_layout(self, layout: str) -> None: + self._layout = layout diff --git a/pylabrobot/capabilities/rack_reading/rack_reader.py b/pylabrobot/capabilities/rack_reading/rack_reader.py new file mode 100644 index 00000000000..48a87e55574 --- /dev/null +++ b/pylabrobot/capabilities/rack_reading/rack_reader.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import asyncio +import time + +from pylabrobot.capabilities.capability import Capability, need_capability_ready + +from .backend import RackReaderBackend +from .standard import LayoutInfo, RackReaderState, RackReaderTimeoutError, RackScanResult + + +class RackReader(Capability): + """Rack-reading capability.""" + + def __init__(self, backend: RackReaderBackend): + super().__init__(backend=backend) + self.backend: RackReaderBackend = backend + + @need_capability_ready + async def get_state(self) -> RackReaderState: + return await self.backend.get_state() + + @need_capability_ready + async def trigger_rack_scan(self) -> None: + await self.backend.trigger_rack_scan() + + @need_capability_ready + async def get_scan_result(self) -> RackScanResult: + return await self.backend.get_scan_result() + + @need_capability_ready + async def get_rack_id(self) -> str: + return await self.backend.get_rack_id() + + @need_capability_ready + async def get_layouts(self) -> list[LayoutInfo]: + return await self.backend.get_layouts() + + @need_capability_ready + async def get_current_layout(self) -> str: + return await self.backend.get_current_layout() + + @need_capability_ready + async def set_current_layout(self, layout: str) -> None: + await self.backend.set_current_layout(layout) + + async def _wait_for_state( + self, + target: RackReaderState, + timeout: float, + poll_interval: float, + ) -> RackReaderState: + deadline = time.monotonic() + timeout + while True: + state = await self.backend.get_state() + if state == target: + return state + if time.monotonic() >= deadline: + raise RackReaderTimeoutError(f"Timed out waiting for rack reader to reach {target.value}.") + await asyncio.sleep(poll_interval) + + async def _wait_for_fresh_data_ready( + self, + initial_state: RackReaderState, + timeout: float, + poll_interval: float, + ) -> None: + require_state_change = initial_state == RackReaderState.DATAREADY + deadline = time.monotonic() + timeout + while True: + state = await self.backend.get_state() + if state != RackReaderState.DATAREADY: + require_state_change = False + elif not require_state_change: + return + + if time.monotonic() >= deadline: + raise RackReaderTimeoutError( + f"Timed out waiting for rack reader to reach {RackReaderState.DATAREADY.value}." + ) + await asyncio.sleep(poll_interval) + + @need_capability_ready + async def wait_for_data_ready( + self, + timeout: float = 60.0, + poll_interval: float = 1.0, + ) -> RackReaderState: + """Wait until the reader reports that scan data is ready.""" + + return await self._wait_for_state( + target=RackReaderState.DATAREADY, + timeout=timeout, + poll_interval=poll_interval, + ) + + @need_capability_ready + async def scan_rack( + self, + timeout: float = 60.0, + poll_interval: float = 1.0, + ) -> RackScanResult: + """Trigger a rack scan and return the completed result.""" + + initial_state = await self.backend.get_state() + await self.backend.trigger_rack_scan() + await self._wait_for_fresh_data_ready( + initial_state=initial_state, + timeout=timeout, + poll_interval=poll_interval, + ) + return await self.backend.get_scan_result() + + @need_capability_ready + async def scan_rack_id( + self, + timeout: float = 60.0, + poll_interval: float = 1.0, + ) -> str: + """Perform a rack-barcode-only scan and return the rack identifier. + + The backend decides whether this is a one-shot read or a trigger/poll cycle; + ``timeout`` and ``poll_interval`` are forwarded so backends that poll can + honor them. + """ + + return await self.backend.scan_rack_id(timeout=timeout, poll_interval=poll_interval) diff --git a/pylabrobot/capabilities/rack_reading/rack_reader_tests.py b/pylabrobot/capabilities/rack_reading/rack_reader_tests.py new file mode 100644 index 00000000000..9b2dc1167ce --- /dev/null +++ b/pylabrobot/capabilities/rack_reading/rack_reader_tests.py @@ -0,0 +1,142 @@ +import unittest + +from pylabrobot.capabilities.rack_reading.backend import RackReaderBackend +from pylabrobot.capabilities.rack_reading.rack_reader import RackReader +from pylabrobot.capabilities.rack_reading.standard import ( + LayoutInfo, + RackReaderState, + RackReaderTimeoutError, + RackScanEntry, + RackScanResult, +) + + +class RecordingRackReaderBackend(RackReaderBackend): + def __init__(self): + self.state = RackReaderState.IDLE + self.calls: list[str] = [] + self.result = RackScanResult( + rack_id="5500135415", + date="20260316", + time="160626", + entries=[ + RackScanEntry(position="A01", tube_id="7518613629", status="Code OK", free_text=""), + ], + ) + + async def get_state(self) -> RackReaderState: + self.calls.append("get_state") + if self.state == RackReaderState.SCANNING: + self.state = RackReaderState.DATAREADY + return self.state + + async def trigger_rack_scan(self) -> None: + self.calls.append("trigger_rack_scan") + self.state = RackReaderState.SCANNING + + async def scan_rack_id(self, timeout: float, poll_interval: float) -> str: + self.calls.append(f"scan_rack_id:{timeout}:{poll_interval}") + return self.result.rack_id + + async def get_scan_result(self) -> RackScanResult: + self.calls.append("get_scan_result") + return self.result + + async def get_rack_id(self) -> str: + self.calls.append("get_rack_id") + return self.result.rack_id + + async def get_layouts(self) -> list[LayoutInfo]: + self.calls.append("get_layouts") + return [LayoutInfo(name="96")] + + async def get_current_layout(self) -> str: + self.calls.append("get_current_layout") + return "96" + + async def set_current_layout(self, layout: str) -> None: + self.calls.append(f"set_current_layout:{layout}") + + +class StuckRackReaderBackend(RecordingRackReaderBackend): + async def get_state(self) -> RackReaderState: + self.calls.append("get_state") + return RackReaderState.SCANNING + + +class StaleDataReadyRackReaderBackend(RecordingRackReaderBackend): + def __init__(self): + super().__init__() + self.state = RackReaderState.DATAREADY + self._states_after_trigger = [ + RackReaderState.DATAREADY, + RackReaderState.SCANNING, + RackReaderState.DATAREADY, + ] + + async def trigger_rack_scan(self) -> None: + self.calls.append("trigger_rack_scan") + + async def get_state(self) -> RackReaderState: + self.calls.append("get_state") + if self._states_after_trigger: + return self._states_after_trigger.pop(0) + return RackReaderState.DATAREADY + + +class TestRackReader(unittest.IsolatedAsyncioTestCase): + async def test_scan_rack_triggers_and_returns_result(self): + backend = RecordingRackReaderBackend() + reader = RackReader(backend=backend) + await reader._on_setup() + + result = await reader.scan_rack(timeout=1.0, poll_interval=0.01) + + self.assertEqual(result.rack_id, "5500135415") + self.assertEqual(result.entries[0].position, "A01") + self.assertEqual( + backend.calls[:3], + ["get_state", "trigger_rack_scan", "get_state"], + ) + + async def test_scan_rack_waits_for_new_dataready_cycle(self): + backend = StaleDataReadyRackReaderBackend() + reader = RackReader(backend=backend) + await reader._on_setup() + + result = await reader.scan_rack(timeout=1.0, poll_interval=0.0) + + self.assertEqual(result.rack_id, "5500135415") + self.assertEqual( + backend.calls, + ["get_state", "trigger_rack_scan", "get_state", "get_state", "get_scan_result"], + ) + + async def test_scan_rack_id_delegates_to_backend(self): + backend = RecordingRackReaderBackend() + reader = RackReader(backend=backend) + await reader._on_setup() + + rack_id = await reader.scan_rack_id(timeout=1.0, poll_interval=0.01) + + self.assertEqual(rack_id, "5500135415") + self.assertEqual(backend.calls, ["scan_rack_id:1.0:0.01"]) + + async def test_scan_rack_times_out(self): + backend = StuckRackReaderBackend() + reader = RackReader(backend=backend) + await reader._on_setup() + + with self.assertRaises(RackReaderTimeoutError): + await reader.scan_rack(timeout=0.01, poll_interval=0.0) + + async def test_requires_setup(self): + backend = RecordingRackReaderBackend() + reader = RackReader(backend=backend) + + with self.assertRaises(RuntimeError): + await reader.get_state() + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/capabilities/rack_reading/standard.py b/pylabrobot/capabilities/rack_reading/standard.py new file mode 100644 index 00000000000..fc46cf0d3b8 --- /dev/null +++ b/pylabrobot/capabilities/rack_reading/standard.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import enum +from dataclasses import dataclass +from typing import Optional + + +class RackReaderError(Exception): + """Base exception for rack reader operations.""" + + +class RackReaderTimeoutError(RackReaderError): + """Raised when a rack reader operation times out.""" + + +class RackReaderState(enum.Enum): + """Normalized rack reader states.""" + + IDLE = "idle" + SCANNING = "scanning" + DATAREADY = "dataready" + + +@dataclass +class RackScanEntry: + """One decoded rack position.""" + + position: str + tube_id: Optional[str] + status: str + free_text: str = "" + + +@dataclass +class RackScanResult: + """A decoded rack scan.""" + + rack_id: str + date: str + time: str + entries: list[RackScanEntry] + + +@dataclass +class LayoutInfo: + """One rack layout supported by the reader. + + Wraps a single ``name`` field today so backends can grow the metadata they + attach to a layout (rows, columns, tube type, vendor-specific attributes) + without changing the capability surface. + """ + + name: str diff --git a/pylabrobot/micronic/__init__.py b/pylabrobot/micronic/__init__.py new file mode 100644 index 00000000000..f55676153d0 --- /dev/null +++ b/pylabrobot/micronic/__init__.py @@ -0,0 +1,7 @@ +from pylabrobot.micronic.code_reader import ( + MicronicCodeReader, + MicronicError, + MicronicDriver, + MicronicRackReadingBackend, + MicronicRackReaderError, +) diff --git a/pylabrobot/micronic/code_reader/__init__.py b/pylabrobot/micronic/code_reader/__init__.py new file mode 100644 index 00000000000..2498dcd12a3 --- /dev/null +++ b/pylabrobot/micronic/code_reader/__init__.py @@ -0,0 +1,9 @@ +from pylabrobot.micronic.code_reader.code_reader import MicronicCodeReader +from pylabrobot.micronic.code_reader.driver import ( + MicronicError, + MicronicDriver, +) +from pylabrobot.micronic.code_reader.rack_reading_backend import ( + MicronicRackReadingBackend, + MicronicRackReaderError, +) diff --git a/pylabrobot/micronic/code_reader/code_reader.py b/pylabrobot/micronic/code_reader/code_reader.py new file mode 100644 index 00000000000..c768f530a54 --- /dev/null +++ b/pylabrobot/micronic/code_reader/code_reader.py @@ -0,0 +1,70 @@ +"""Micronic Code Reader device.""" + +from __future__ import annotations + +from typing import Optional, Sequence + +from pylabrobot.capabilities.rack_reading import RackReader +from pylabrobot.device import Device + +from .driver import MicronicDriver +from .rack_reading_backend import MicronicRackReadingBackend + + +class MicronicCodeReader(Device): + """Micronic rack reader device. + + The rack-reading capability is driven by ``MicronicDriver``. + """ + + def __init__( + self, + twain_scanner_path: Optional[str] = None, + twain_source: str = "AVA6PlusG", + sane_device: Optional[str] = None, + scanner_backend: str = "auto", + scan_command: Optional[Sequence[str]] = None, + image_extension: Optional[str] = None, + image_dir: Optional[str] = None, + serial_port: str = "COM4", + rack_id_command: Optional[Sequence[str]] = None, + timeout: float = 90.0, + poll_interval: float = 1.0, + serial_timeout_ms: int = 2500, + min_wells: int = 96, + keep_images: bool = False, + image_input: Optional[str] = None, + rack_id_override: Optional[str] = None, + driver: Optional[MicronicDriver] = None, + ): + if driver is None: + driver = MicronicDriver( + twain_scanner_path=twain_scanner_path, + twain_source=twain_source, + sane_device=sane_device, + scanner_backend=scanner_backend, + scan_command=scan_command, + image_extension=image_extension, + image_dir=image_dir, + serial_port=serial_port, + rack_id_command=rack_id_command, + scanner_timeout_ms=int(timeout * 1000), + serial_timeout_ms=serial_timeout_ms, + min_wells=min_wells, + keep_images=keep_images, + image_input=image_input, + rack_id_override=rack_id_override, + ) + super().__init__(driver=driver) + self.driver: MicronicDriver = driver + self.default_timeout = timeout + self.default_poll_interval = poll_interval + self.rack_reading = RackReader(backend=MicronicRackReadingBackend(driver)) + self._capabilities = [self.rack_reading] + + def serialize(self) -> dict: + return { + **super().serialize(), + "timeout": self.default_timeout, + "poll_interval": self.default_poll_interval, + } diff --git a/pylabrobot/micronic/code_reader/driver.py b/pylabrobot/micronic/code_reader/driver.py new file mode 100644 index 00000000000..666aeeff9f9 --- /dev/null +++ b/pylabrobot/micronic/code_reader/driver.py @@ -0,0 +1,843 @@ +"""Hardware driver for the Micronic rack scanner. + +This driver does not call Micronic Code Reader or IO Monitor. It owns the local +scanner path directly: + +- acquire a rack image through a user-supplied scan command, Windows TWAIN + helper, or SANE ``scanimage`` command, +- read the rack ID through the side serial barcode reader, +- decode tube DataMatrix codes locally, and +- return the standard PLR rack-reading result. +""" + +from __future__ import annotations + +import asyncio +import os +import re +import shutil +import subprocess # nosec B404 - local scanner/rack-id helper execution is the interface. +import tempfile +import time +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Iterable, Optional, Sequence + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.rack_reading import ( + LayoutInfo, + RackReaderState, + RackScanEntry, + RackScanResult, +) +from pylabrobot.device import Driver +from pylabrobot.io.serial import Serial + + +ROWS = "ABCDEFGH" +COLS = 12 +RACK_ROWS = 8 +RACK_COLS = 12 + + +class MicronicError(Exception): + """Raised when Micronic driver operations fail.""" + + +@dataclass(frozen=True) +class DecodeResult: + tube_id: str + method: str + + +class MicronicDriver(Driver): + """Driver that controls the Micronic scanner without the OEM app.""" + + def __init__( + self, + twain_scanner_path: Optional[str] = None, + twain_source: str = "AVA6PlusG", + sane_device: Optional[str] = None, + scanner_backend: str = "auto", + scan_command: Optional[Sequence[str]] = None, + image_extension: Optional[str] = None, + image_dir: Optional[str] = None, + serial_port: str = "COM4", + rack_id_command: Optional[Sequence[str]] = None, + scanner_timeout_ms: int = 90000, + serial_timeout_ms: int = 2500, + min_wells: int = 96, + keep_images: bool = False, + image_input: Optional[str] = None, + rack_id_override: Optional[str] = None, + ): + super().__init__() + self.twain_scanner_path = twain_scanner_path + self.twain_source = twain_source + self.sane_device = sane_device + self.scanner_backend = scanner_backend + self.scan_command = list(scan_command) if scan_command is not None else None + self.image_extension = normalize_image_extension(image_extension) if image_extension else None + self.image_dir = ( + Path(image_dir) if image_dir else Path(tempfile.gettempdir()) / "pylabrobot-micronic" + ) + self.serial_port = serial_port + self.rack_id_command = list(rack_id_command) if rack_id_command is not None else None + self.scanner_timeout_ms = scanner_timeout_ms + self.serial_timeout_ms = serial_timeout_ms + self.min_wells = min_wells + self.keep_images = keep_images + self.image_input = image_input + self.rack_id_override = rack_id_override + self._state = RackReaderState.IDLE + self._last_result: Optional[RackScanResult] = None + self._scan_task: Optional[asyncio.Future[RackScanResult]] = None + self._scan_error: Optional[Exception] = None + self._reported_scanning_since_trigger = False + self.last_image_path: Optional[Path] = None + self.last_scan_metadata: dict[str, object] = {} + self.last_decode_metadata: dict[str, object] = {} + + async def setup(self, backend_params: Optional[BackendParams] = None): + del backend_params + self.image_dir.mkdir(parents=True, exist_ok=True) + + async def stop(self): + scan_task = self._scan_task + if scan_task is None: + if self._scan_error is not None: + raise self._scan_error + return + if scan_task.done(): + self._complete_scan_task(scan_task) + else: + try: + self._last_result = await scan_task + except asyncio.CancelledError: + self._scan_error = MicronicError("Micronic rack scan was cancelled.") + self._state = RackReaderState.IDLE + except Exception as exc: + self._scan_error = exc + self._state = RackReaderState.IDLE + else: + self._scan_error = None + self._state = RackReaderState.DATAREADY + finally: + self._scan_task = None + if self._scan_error is not None: + raise self._scan_error + + def serialize(self) -> dict: + return { + **super().serialize(), + "twain_scanner_path": self.twain_scanner_path, + "twain_source": self.twain_source, + "sane_device": self.sane_device, + "scanner_backend": self.scanner_backend, + "scan_command": self.scan_command, + "image_extension": self.image_extension, + "image_dir": str(self.image_dir), + "serial_port": self.serial_port, + "rack_id_command": self.rack_id_command, + "scanner_timeout_ms": self.scanner_timeout_ms, + "serial_timeout_ms": self.serial_timeout_ms, + "min_wells": self.min_wells, + "keep_images": self.keep_images, + "image_input": self.image_input, + "rack_id_override": self.rack_id_override, + } + + async def get_rack_reader_state(self) -> RackReaderState: + if self._state == RackReaderState.SCANNING and not self._reported_scanning_since_trigger: + self._reported_scanning_since_trigger = True + return self._state + self._complete_finished_scan_task() + if self._scan_error is not None: + raise self._scan_error + return self._state + + async def trigger_rack_scan(self) -> None: + self._complete_finished_scan_task() + if self._scan_task is not None and not self._scan_task.done(): + raise MicronicError("Micronic rack scan is already in progress.") + self._last_result = None + self._state = RackReaderState.SCANNING + self._scan_error = None + self._reported_scanning_since_trigger = False + loop = asyncio.get_running_loop() + self._scan_task = loop.run_in_executor(None, self._scan_rack_blocking) + + async def scan_rack_id(self, timeout: float, poll_interval: float) -> str: + del timeout, poll_interval + if self.rack_id_override: + return self.rack_id_override + if self.rack_id_command is not None: + return await asyncio.to_thread( + read_rack_id_command, + format_command( + self.rack_id_command, + serial_port=self.serial_port, + timeout_ms=self.serial_timeout_ms, + ), + self.serial_timeout_ms, + ) + return await read_rack_id_plr_serial( + serial_port=self.serial_port, + timeout_ms=self.serial_timeout_ms, + ) + + async def get_scan_result(self) -> RackScanResult: + self._complete_finished_scan_task() + if self._scan_error is not None: + raise self._scan_error + if self._state == RackReaderState.SCANNING: + raise MicronicError("Micronic rack scan is still in progress.") + if self._last_result is None: + raise MicronicError("No Micronic rack scan has completed yet.") + return self._last_result + + async def get_rack_id(self) -> str: + self._complete_finished_scan_task() + if self._scan_error is not None: + raise self._scan_error + if self._state == RackReaderState.SCANNING: + raise MicronicError("Micronic rack scan is still in progress.") + if self._last_result is not None: + return self._last_result.rack_id + return await self.scan_rack_id(timeout=0, poll_interval=0) + + def _complete_finished_scan_task(self) -> None: + if self._scan_task is not None and self._scan_task.done(): + self._complete_scan_task(self._scan_task) + + def _complete_scan_task(self, task: asyncio.Future[RackScanResult]) -> None: + if task is not self._scan_task: + return + + self._scan_task = None + try: + self._last_result = task.result() + except asyncio.CancelledError: + self._scan_error = MicronicError("Micronic rack scan was cancelled.") + self._state = RackReaderState.IDLE + except Exception as exc: + self._scan_error = exc + self._state = RackReaderState.IDLE + else: + self._scan_error = None + self._state = RackReaderState.DATAREADY + + async def get_layouts(self) -> list[LayoutInfo]: + return [LayoutInfo(name="8x12")] + + async def get_current_layout(self) -> str: + return "8x12" + + async def set_current_layout(self, layout: str) -> None: + normalized = layout.strip().lower().replace(" ", "") + if normalized not in {"8x12", "96(8x12)", "96"}: + raise MicronicError(f"Unsupported Micronic rack layout: {layout}") + + def _scan_rack_blocking(self) -> RackScanResult: + self.image_dir.mkdir(parents=True, exist_ok=True) + image_extension = choose_image_extension( + image_extension=self.image_extension, + image_input=self.image_input, + scanner_backend=self.scanner_backend, + scan_command=self.scan_command, + twain_scanner_path=self.twain_scanner_path, + sane_device=self.sane_device, + ) + image_path = ( + self.image_dir / f"micronic_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.{image_extension}" + ) + + self.last_scan_metadata = run_scan( + twain_scanner_path=self.twain_scanner_path, + twain_source=self.twain_source, + sane_device=self.sane_device, + scanner_backend=self.scanner_backend, + scan_command=self.scan_command, + output_path=image_path, + timeout_ms=self.scanner_timeout_ms, + image_input=self.image_input, + ) + self.last_image_path = image_path + + rack_id = read_rack_id( + serial_port=self.serial_port, + timeout_ms=self.serial_timeout_ms, + rack_id_override=self.rack_id_override, + rack_id_command=self.rack_id_command, + ) + decoded, self.last_decode_metadata = decode_image(image_path) + if len(decoded) < self.min_wells: + missing = ", ".join(position for position in iter_positions() if position not in decoded) + raise MicronicError( + f"Micronic decode found {len(decoded)} wells; expected at least {self.min_wells}. " + f"Missing: {missing}" + ) + + now = datetime.now() + date_text = now.strftime("%Y%m%d") + time_text = now.strftime("%H%M%S") + entries = [ + RackScanEntry( + position=position, + tube_id=decoded[position].tube_id if position in decoded else None, + status="OK" if position in decoded else "NOREAD", + free_text=decoded[position].method if position in decoded else "", + ) + for position in iter_positions() + ] + + if not self.keep_images and self.image_input is None: + try: + image_path.unlink() + self.last_image_path = None + except OSError: + pass + + return RackScanResult( + rack_id=rack_id, + date=date_text, + time=time_text, + entries=entries, + ) + + +def run_scan( + output_path: Path, + timeout_ms: int, + twain_scanner_path: Optional[str] = None, + twain_source: str = "AVA6PlusG", + sane_device: Optional[str] = None, + scanner_backend: str = "auto", + scan_command: Optional[Sequence[str]] = None, + image_input: Optional[str] = None, +) -> dict[str, object]: + if image_input: + source_path = Path(image_input) + if not source_path.exists(): + raise MicronicError(f"Image input does not exist: {source_path}") + output_path.write_bytes(source_path.read_bytes()) + return {"stdout": "", "stderr": "", "source": str(source_path)} + + if scan_command is not None: + command = format_command( + scan_command, + output_path=output_path, + timeout_ms=timeout_ms, + twain_source=twain_source, + sane_device=sane_device or "", + ) + return run_scan_command(command, output_path, timeout_ms, source="command") + + backend = normalize_scanner_backend(scanner_backend) + if backend == "command": + raise MicronicError("Command scan requested, but scan_command was not configured.") + + if backend in {"auto", "twain"}: + resolved_twain_path = resolve_twain_scanner_path(twain_scanner_path) + if resolved_twain_path is not None: + return run_scan_command( + [resolved_twain_path, str(output_path), twain_source, str(timeout_ms)], + output_path, + timeout_ms, + source="twain", + ) + if backend == "twain": + raise MicronicError( + "TWAIN scan requested, but no TWAIN helper was configured. Set " + "twain_scanner_path, MICRONIC_TWAIN_SCANNER_PATH, or put twain_scan on PATH." + ) + + if backend in {"auto", "sane"}: + scanimage_path = shutil.which("scanimage") + if scanimage_path is not None: + command = [scanimage_path] + if sane_device: + command.extend(["--device-name", sane_device]) + command.extend(["--format=tiff", "--output-file", str(output_path)]) + return run_scan_command(command, output_path, timeout_ms, source="sane") + if backend == "sane": + raise MicronicError("SANE scan requested, but scanimage was not found on PATH.") + + raise MicronicError( + "No scan acquisition method is available. Configure scan_command, " + "twain_scanner_path/MICRONIC_TWAIN_SCANNER_PATH, or install SANE scanimage." + ) + + +def run_scan_command( + command: Sequence[str], + output_path: Path, + timeout_ms: int, + source: str, +) -> dict[str, object]: + try: + completed = subprocess.run( # nosec B603 - operator-configured command, shell=False. + list(command), + check=False, + capture_output=True, + text=True, + timeout=(timeout_ms / 1000) + 15, + ) + except FileNotFoundError as exc: + raise MicronicError(f"Scan command was not found: {command[0]}") from exc + + if completed.returncode != 0: + raise MicronicError( + "Scan command failed with exit code " + f"{completed.returncode}: {completed.stderr.strip() or completed.stdout.strip()}" + ) + if not output_path.exists(): + raise MicronicError(f"Scan command did not create image: {output_path}") + return { + "stdout": completed.stdout.strip(), + "stderr": completed.stderr.strip(), + "source": source, + "command": list(command), + } + + +def read_rack_id( + serial_port: str = "COM4", + timeout_ms: int = 2500, + rack_id_override: Optional[str] = None, + rack_id_command: Optional[Sequence[str]] = None, +) -> str: + if rack_id_override: + return rack_id_override + + if rack_id_command is not None: + command = format_command( + rack_id_command, + serial_port=serial_port, + timeout_ms=timeout_ms, + ) + return read_rack_id_command(command, timeout_ms) + + return asyncio.run(read_rack_id_plr_serial(serial_port=serial_port, timeout_ms=timeout_ms)) + + +async def read_rack_id_plr_serial(serial_port: str, timeout_ms: int) -> str: + deadline = time.monotonic() + timeout_ms / 1000 + chunks: list[bytes] = [] + io = Serial( + human_readable_device_name="Micronic rack ID reader", + port=serial_port, + baudrate=9600, + bytesize=7, + parity="E", + stopbits=1, + timeout=0.1, + write_timeout=1.0, + ) + try: + await io.setup() + await io.reset_input_buffer() + await io.write(b"\r\n") + while time.monotonic() < deadline: + value = await io.read(1) + if value: + chunks.append(value) + if value in {b"\r", b"\n"}: + break + except Exception as exc: + raise MicronicError( + "Rack ID serial read failed. Install the PLR serial extra with " + "`pip install pylabrobot[serial]` and verify the serial port: " + f"{exc}" + ) from exc + finally: + await io.stop() + + return extract_rack_id(b"".join(chunks).decode("utf-8", errors="ignore")) + + +def read_rack_id_command(command: Sequence[str], timeout_ms: int) -> str: + try: + completed = subprocess.run( # nosec B603 - operator-configured command, shell=False. + list(command), + check=False, + capture_output=True, + text=True, + timeout=(timeout_ms / 1000) + 5, + ) + except FileNotFoundError as exc: + raise MicronicError(f"Rack ID command was not found: {command[0]}") from exc + + if completed.returncode != 0: + raise MicronicError( + "Rack ID command failed with exit code " + f"{completed.returncode}: {completed.stderr.strip() or completed.stdout.strip()}" + ) + return extract_rack_id(completed.stdout) + + +def extract_rack_id(text: str) -> str: + match = re.search(r"\d{6,}", text) + return match.group(0) if match else "NOREAD" + + +def normalize_scanner_backend(scanner_backend: str) -> str: + backend = scanner_backend.strip().lower() + if backend not in {"auto", "twain", "sane", "command"}: + raise MicronicError( + "Unsupported scanner backend " + f"{scanner_backend!r}; expected 'auto', 'twain', 'sane', or 'command'." + ) + return backend + + +def normalize_image_extension(image_extension: str) -> str: + normalized = image_extension.strip().lstrip(".") + if not normalized: + raise MicronicError("image_extension must not be empty.") + return normalized + + +def choose_image_extension( + image_extension: Optional[str], + image_input: Optional[str], + scanner_backend: str, + scan_command: Optional[Sequence[str]], + twain_scanner_path: Optional[str], + sane_device: Optional[str], +) -> str: + if image_extension: + return normalize_image_extension(image_extension) + if image_input and Path(image_input).suffix: + return normalize_image_extension(Path(image_input).suffix) + + backend = normalize_scanner_backend(scanner_backend) + if backend == "sane" or ( + backend == "auto" + and scan_command is None + and twain_scanner_path is None + and (sane_device is not None or os.name != "nt") + ): + return "tiff" + return "bmp" + + +def resolve_twain_scanner_path(twain_scanner_path: Optional[str]) -> Optional[str]: + if twain_scanner_path: + return twain_scanner_path + + env_path = os.environ.get("MICRONIC_TWAIN_SCANNER_PATH") + if env_path: + return env_path + + return shutil.which("twain_scan.exe") or shutil.which("twain_scan") + + +def format_command(command: Sequence[str], **values: object) -> list[str]: + return [part.format(**values) for part in command] + + +def decode_image(image_path: Path) -> tuple[dict[str, DecodeResult], dict[str, object]]: + cv2, np, zxingcpp, Image, ImageOps = import_decode_dependencies() + with Image.open(image_path) as loaded_image: + image = loaded_image.convert("L") + full_results = zxingcpp.read_barcodes( + image, + formats=zxingcpp.BarcodeFormat.DataMatrix, + try_rotate=True, + try_downscale=True, + try_invert=True, + ) + + detected: list[tuple[float, float, str]] = [] + for result in full_results: + if not is_tube_id(result.text): + continue + corners = [ + result.position.top_left, + result.position.top_right, + result.position.bottom_right, + result.position.bottom_left, + ] + detected.append( + ( + sum(corner.x for corner in corners) / 4, + sum(corner.y for corner in corners) / 4, + result.text, + ) + ) + + if len(detected) < 24: + raise MicronicError(f"Only {len(detected)} DataMatrix codes were found in the full image.") + + xs = fitted_axis(cluster_axis([item[0] for item in detected], RACK_ROWS, 90), RACK_ROWS) + ys = fitted_axis(cluster_axis([item[1] for item in detected], RACK_COLS, 90), RACK_COLS) + x_pitch = abs(xs[-1] - xs[0]) / (RACK_ROWS - 1) + y_pitch = abs(ys[-1] - ys[0]) / (RACK_COLS - 1) + + decoded: dict[str, DecodeResult] = {} + for x, y, tube_id in detected: + scan_col = min(range(RACK_ROWS), key=lambda index: abs(xs[index] - x)) + scan_row = min(range(RACK_COLS), key=lambda index: abs(ys[index] - y)) + if abs(xs[scan_col] - x) > x_pitch * 0.45 or abs(ys[scan_row] - y) > y_pitch * 0.45: + continue + decoded[rack_position(scan_row, scan_col)] = DecodeResult(tube_id=tube_id, method="full-image") + + for scan_row in range(RACK_COLS): + for scan_col in range(RACK_ROWS): + position = rack_position(scan_row, scan_col) + if position in decoded: + continue + crop_result = decode_well_crop( + image, + xs[scan_col], + ys[scan_row], + cv2, + np, + zxingcpp, + Image, + ImageOps, + ) + if crop_result: + decoded[position] = crop_result + + duplicate_ids = find_duplicate_ids(decoded) + if duplicate_ids: + raise MicronicError( + f"Duplicate tube IDs decoded from more than one well: {', '.join(duplicate_ids)}" + ) + + metadata = { + "imageSize": image.size, + "fullImageDecoded": len(detected), + "gridX": [round(value, 1) for value in xs], + "gridY": [round(value, 1) for value in ys], + "decodedWells": len(decoded), + "missing": [position for position in iter_positions() if position not in decoded], + } + return decoded, metadata + + +def import_decode_dependencies(): + try: + import cv2 # type: ignore + import numpy as np # type: ignore + import zxingcpp # type: ignore + from PIL import Image, ImageOps # type: ignore + except ImportError as exc: + raise MicronicError( + "Micronic decode dependencies are missing. Install pillow, " + "opencv-python-headless, numpy, and zxing-cpp." + ) from exc + return cv2, np, zxingcpp, Image, ImageOps + + +def cluster_axis(values: list[float], expected_count: int, tolerance: float) -> list[float]: + if not values: + raise MicronicError("No decoded barcode positions are available for grid calibration.") + + clusters: list[list[float]] = [] + for value in sorted(values): + if not clusters: + clusters.append([value]) + continue + mean = sum(clusters[-1]) / len(clusters[-1]) + if abs(value - mean) > tolerance: + clusters.append([value]) + else: + clusters[-1].append(value) + + means = [sum(cluster) / len(cluster) for cluster in clusters] + if len(means) == expected_count: + return means + if len(means) >= 2: + return [ + means[0] + index * (means[-1] - means[0]) / (expected_count - 1) + for index in range(expected_count) + ] + raise MicronicError( + f"Could not fit {expected_count} grid clusters from {len(values)} decoded positions." + ) + + +def fitted_axis(means: list[float], expected_count: int) -> list[float]: + return [ + means[0] + index * (means[-1] - means[0]) / (expected_count - 1) + for index in range(expected_count) + ] + + +def rack_position(scan_row: int, scan_col: int) -> str: + return f"{ROWS[RACK_ROWS - 1 - scan_col]}{RACK_COLS - scan_row:02d}" + + +def iter_positions() -> Iterable[str]: + for row in ROWS: + for column in range(1, COLS + 1): + yield f"{row}{column:02d}" + + +def is_tube_id(value: object) -> bool: + return isinstance(value, str) and value.isdigit() and len(value) == 10 + + +def decode_well_crop( + image, center_x, center_y, cv2, np, zxingcpp, Image, ImageOps +) -> Optional[DecodeResult]: + for size in [150, 160, 180, 200, 220, 240]: + crop = centered_crop(image, center_x, center_y, size) + decoded = decode_pil_variants(crop, zxingcpp, ImageOps) + if decoded: + return DecodeResult(tube_id=decoded, method=f"crop-{size}") + + for size in [100, 120, 140, 160]: + crop = centered_crop(image, center_x, center_y, size) + decoded = decode_perspective_crop(crop, cv2, np, zxingcpp, Image, ImageOps) + if decoded: + return DecodeResult(tube_id=decoded, method=f"perspective-{size}") + + return None + + +def centered_crop(image, center_x: float, center_y: float, size: int): + half = size / 2 + return image.crop( + ( + int(round(center_x - half)), + int(round(center_y - half)), + int(round(center_x + half)), + int(round(center_y + half)), + ) + ) + + +def decode_pil_variants(crop, zxingcpp, ImageOps) -> Optional[str]: + for variant in [crop, ImageOps.autocontrast(crop), ImageOps.equalize(crop)]: + decoded = decode_with_zxing(variant, zxingcpp, ImageOps) + if decoded: + return decoded + return None + + +def decode_with_zxing(image, zxingcpp, ImageOps) -> Optional[str]: + binarizers = [ + zxingcpp.Binarizer.LocalAverage, + zxingcpp.Binarizer.GlobalHistogram, + zxingcpp.Binarizer.FixedThreshold, + ] + for scale in [1, 2, 3, 4]: + scaled = image if scale == 1 else image.resize((image.width * scale, image.height * scale)) + for invert in [False, True]: + candidate = ImageOps.invert(scaled) if invert else scaled + for border in [0, 20, 50]: + padded = ImageOps.expand(candidate, border=border, fill=255) if border else candidate + for binarizer in binarizers: + for pure in [False, True]: + results = zxingcpp.read_barcodes( + padded, + formats=zxingcpp.BarcodeFormat.DataMatrix, + try_rotate=True, + try_downscale=False, + try_invert=True, + binarizer=binarizer, + is_pure=pure, + ) + for result in results: + if is_tube_id(result.text): + return str(result.text) + return None + + +def order_box(points, np): + points = np.array(points, dtype=np.float32) + sums = points.sum(axis=1) + diffs = np.diff(points, axis=1).ravel() + return np.array( + [ + points[np.argmin(sums)], + points[np.argmin(diffs)], + points[np.argmax(sums)], + points[np.argmax(diffs)], + ], + dtype=np.float32, + ) + + +def decode_perspective_crop(crop, cv2, np, zxingcpp, Image, ImageOps) -> Optional[str]: + crop_array = np.array(crop) + for threshold in [30, 40, 50, 60, 70, 80, 90, 100, 120, 140]: + mask = (crop_array < threshold).astype(np.uint8) * 255 + for candidate_mask in candidate_masks(mask, cv2, np): + if not candidate_mask.any(): + continue + points = np.column_stack(np.where(candidate_mask > 0))[:, ::-1].astype(np.float32) + if len(points) < 40: + continue + rect = cv2.minAreaRect(points) + (rect_x, rect_y), (rect_w, rect_h), _angle = rect + if rect_w < 25 or rect_h < 25 or rect_w > crop.width * 0.9 or rect_h > crop.height * 0.9: + continue + if max(rect_w, rect_h) / max(1, min(rect_w, rect_h)) > 2: + continue + + box = cv2.boxPoints(rect) + center = np.array([rect_x, rect_y], dtype=np.float32) + for margin in [0.9, 1.0, 1.1, 1.2, 1.35]: + source = order_box((box - center) * margin + center, np) + for output_size in [60, 80, 100, 120, 160]: + destination = np.array( + [ + [0, 0], + [output_size - 1, 0], + [output_size - 1, output_size - 1], + [0, output_size - 1], + ], + dtype=np.float32, + ) + matrix = cv2.getPerspectiveTransform(source, destination) + warped = cv2.warpPerspective( + crop_array, matrix, (output_size, output_size), borderValue=255 + ) + for mode_array in perspective_variants(warped, threshold, cv2, Image, ImageOps): + decoded = decode_with_zxing(mode_array, zxingcpp, ImageOps) + if decoded: + return decoded + return None + + +def candidate_masks(mask, cv2, np): + yield mask + number, labels, stats, centroids = cv2.connectedComponentsWithStats(mask, 8) + combined = np.zeros_like(mask) + size = mask.shape[0] + for index in range(1, number): + _x, _y, width, height, area = stats[index] + center_x, center_y = centroids[index] + if area < 15 or width < 8 or height < 8: + continue + if abs(center_x - size / 2) > size * 0.33 or abs(center_y - size / 2) > size * 0.33: + continue + if width > size * 0.85 or height > size * 0.85: + continue + combined[labels == index] = 255 + yield combined + + +def perspective_variants(warped, threshold: int, cv2, Image, ImageOps): + yield Image.fromarray(warped) + yield ImageOps.autocontrast(Image.fromarray(warped)) + _, binary = cv2.threshold(warped, min(220, threshold + 70), 255, cv2.THRESH_BINARY) + yield Image.fromarray(binary) + yield Image.fromarray(255 - binary) + + +def find_duplicate_ids(decoded: dict[str, DecodeResult]) -> list[str]: + seen: dict[str, str] = {} + duplicates: list[str] = [] + for position, result in decoded.items(): + previous = seen.get(result.tube_id) + if previous and previous != position: + duplicates.append(result.tube_id) + seen[result.tube_id] = position + return sorted(set(duplicates)) diff --git a/pylabrobot/micronic/code_reader/micronic_tests.py b/pylabrobot/micronic/code_reader/micronic_tests.py new file mode 100644 index 00000000000..2b63a3f3eb4 --- /dev/null +++ b/pylabrobot/micronic/code_reader/micronic_tests.py @@ -0,0 +1,346 @@ +import asyncio +import sys +import tempfile +import unittest +from pathlib import Path +from typing import cast +from unittest.mock import MagicMock, patch + +from pylabrobot.capabilities.rack_reading import RackReaderState +from pylabrobot.micronic import MicronicCodeReader +from pylabrobot.micronic.code_reader.driver import ( + DecodeResult, + MicronicDriver, + MicronicError, + choose_image_extension, + read_rack_id, + read_rack_id_plr_serial, + run_scan, +) +from pylabrobot.micronic.code_reader.rack_reading_backend import MicronicRackReadingBackend + + +async def wait_for_dataready(driver: MicronicDriver) -> None: + for _ in range(100): + if await driver.get_rack_reader_state() == RackReaderState.DATAREADY: + return + await asyncio.sleep(0.01) + raise AssertionError("Micronic test scan did not reach dataready.") + + +class TestMicronicDriver(unittest.IsolatedAsyncioTestCase): + def test_driver_does_not_default_to_packaged_twain_helper(self): + driver = MicronicDriver() + self.assertIsNone(driver.twain_scanner_path) + self.assertIsNone(driver.scan_command) + + async def test_driver_scan_populates_standard_rack_result(self): + with tempfile.TemporaryDirectory() as image_dir: + driver = MicronicDriver( + image_dir=image_dir, + min_wells=2, + keep_images=True, + ) + decoded = { + "A01": DecodeResult(tube_id="1111111111", method="test"), + "A02": DecodeResult(tube_id="2222222222", method="test"), + } + with ( + patch( + "pylabrobot.micronic.code_reader.driver.run_scan", + return_value={"source": "test"}, + ) as run_scan_mock, + patch( + "pylabrobot.micronic.code_reader.driver.read_rack_id", + return_value="9500017722", + ) as read_rack_id_mock, + patch( + "pylabrobot.micronic.code_reader.driver.decode_image", + return_value=(decoded, {"decodedWells": 2}), + ) as decode_image_mock, + ): + await driver.setup() + await driver.trigger_rack_scan() + await wait_for_dataready(driver) + result = await driver.get_scan_result() + + self.assertEqual(await driver.get_rack_reader_state(), RackReaderState.DATAREADY) + self.assertEqual(result.rack_id, "9500017722") + self.assertEqual(result.entries[0].position, "A01") + self.assertEqual(result.entries[0].tube_id, "1111111111") + self.assertEqual(result.entries[1].tube_id, "2222222222") + self.assertEqual(driver.last_scan_metadata, {"source": "test"}) + self.assertEqual(driver.last_decode_metadata, {"decodedWells": 2}) + run_scan_mock.assert_called_once() + read_rack_id_mock.assert_called_once() + decode_image_mock.assert_called_once() + + async def test_reader_can_scan_twice_after_dataready(self): + with tempfile.TemporaryDirectory() as image_dir: + reader = MicronicCodeReader( + driver=MicronicDriver( + image_dir=image_dir, + min_wells=1, + keep_images=True, + ) + ) + decoded = {"A01": DecodeResult(tube_id="1111111111", method="test")} + with ( + patch( + "pylabrobot.micronic.code_reader.driver.run_scan", + return_value={"source": "test"}, + ) as run_scan_mock, + patch( + "pylabrobot.micronic.code_reader.driver.read_rack_id", + return_value="9500017722", + ), + patch( + "pylabrobot.micronic.code_reader.driver.decode_image", + return_value=(decoded, {"decodedWells": 1}), + ), + ): + await reader.setup() + first = await reader.rack_reading.scan_rack(timeout=1.0, poll_interval=0.01) + second = await reader.rack_reading.scan_rack(timeout=1.0, poll_interval=0.01) + + self.assertEqual(first.rack_id, "9500017722") + self.assertEqual(second.rack_id, "9500017722") + self.assertEqual(run_scan_mock.call_count, 2) + + async def test_driver_get_rack_id_does_not_return_stale_result_while_scanning(self): + with tempfile.TemporaryDirectory() as image_dir: + driver = MicronicDriver( + image_dir=image_dir, + min_wells=1, + keep_images=True, + ) + decoded = {"A01": DecodeResult(tube_id="1111111111", method="test")} + + def slow_scan(*args, **kwargs): + del args, kwargs + import time + + time.sleep(0.05) + return {"source": "test"} + + with ( + patch( + "pylabrobot.micronic.code_reader.driver.run_scan", + return_value={"source": "test"}, + ), + patch( + "pylabrobot.micronic.code_reader.driver.read_rack_id", + return_value="9500017722", + ), + patch( + "pylabrobot.micronic.code_reader.driver.decode_image", + return_value=(decoded, {"decodedWells": 1}), + ), + ): + await driver.setup() + await driver.trigger_rack_scan() + await wait_for_dataready(driver) + self.assertEqual(await driver.get_rack_id(), "9500017722") + + with ( + patch( + "pylabrobot.micronic.code_reader.driver.run_scan", + side_effect=slow_scan, + ), + patch( + "pylabrobot.micronic.code_reader.driver.read_rack_id", + return_value="9500017723", + ), + patch( + "pylabrobot.micronic.code_reader.driver.decode_image", + return_value=(decoded, {"decodedWells": 1}), + ), + ): + await driver.trigger_rack_scan() + with self.assertRaises(MicronicError): + await driver.get_rack_id() + await wait_for_dataready(driver) + self.assertEqual(await driver.get_rack_id(), "9500017723") + + async def test_run_scan_uses_explicit_command(self): + with tempfile.TemporaryDirectory() as image_dir: + output_path = Path(image_dir) / "rack.bmp" + metadata = run_scan( + output_path=output_path, + timeout_ms=1000, + scan_command=[ + sys.executable, + "-c", + "from pathlib import Path; Path(r'{output_path}').write_bytes(b'image')", + ], + ) + + self.assertEqual(metadata["source"], "command") + self.assertTrue(output_path.exists()) + + async def test_run_scan_uses_sane_scanimage_when_requested(self): + with tempfile.TemporaryDirectory() as image_dir: + output_path = Path(image_dir) / "micronic-test.tiff" + with ( + patch( + "pylabrobot.micronic.code_reader.driver.shutil.which", + return_value="/usr/bin/scanimage", + ), + patch( + "pylabrobot.micronic.code_reader.driver.run_scan_command", + return_value={"source": "sane"}, + ) as run_scan_command, + ): + metadata = run_scan( + output_path=output_path, + timeout_ms=1000, + scanner_backend="sane", + sane_device="avision:libusb:001:004", + ) + + self.assertEqual(metadata["source"], "sane") + run_scan_command.assert_called_once_with( + [ + "/usr/bin/scanimage", + "--device-name", + "avision:libusb:001:004", + "--format=tiff", + "--output-file", + str(output_path), + ], + output_path, + 1000, + source="sane", + ) + + async def test_run_scan_requires_configured_acquisition(self): + with tempfile.TemporaryDirectory() as image_dir: + with ( + patch("pylabrobot.micronic.code_reader.driver.shutil.which", return_value=None), + self.assertRaises(MicronicError), + ): + run_scan( + output_path=Path(image_dir) / "micronic-test.bmp", + timeout_ms=1000, + scanner_backend="twain", + ) + + async def test_choose_image_extension_prefers_sane_tiff_on_non_windows_auto(self): + extension = choose_image_extension( + image_extension=None, + image_input=None, + scanner_backend="auto", + scan_command=None, + twain_scanner_path=None, + sane_device="avision:libusb:001:004", + ) + self.assertEqual(extension, "tiff") + + async def test_read_rack_id_uses_configured_command(self): + rack_id = read_rack_id( + timeout_ms=1000, + rack_id_command=[sys.executable, "-c", "print('rack 9500017722')"], + ) + self.assertEqual(rack_id, "9500017722") + + async def test_read_rack_id_uses_plr_serial(self): + instances: list[object] = [] + + class FakeSerial: + def __init__(self, **kwargs): + self.kwargs = kwargs + self.reads = iter([b"9", b"5", b"0", b"0", b"0", b"1", b"7", b"7", b"2", b"2", b"\r"]) + self.calls: list[str] = [] + instances.append(self) + + async def setup(self): + self.calls.append("setup") + + async def reset_input_buffer(self): + self.calls.append("reset_input_buffer") + + async def write(self, data: bytes): + self.calls.append(f"write:{data!r}") + + async def read(self, num_bytes: int = 1) -> bytes: + self.calls.append(f"read:{num_bytes}") + return next(self.reads) + + async def stop(self): + self.calls.append("stop") + + with patch("pylabrobot.micronic.code_reader.driver.Serial", FakeSerial): + rack_id = await read_rack_id_plr_serial(serial_port="COM4", timeout_ms=1000) + + self.assertEqual(len(instances), 1) + fake_serial = cast(FakeSerial, instances[0]) + self.assertEqual(rack_id, "9500017722") + self.assertEqual(fake_serial.kwargs["port"], "COM4") + self.assertEqual(fake_serial.kwargs["bytesize"], 7) + self.assertEqual(fake_serial.kwargs["parity"], "E") + self.assertIn("reset_input_buffer", fake_serial.calls) + self.assertIn("write:b'\\r\\n'", fake_serial.calls) + self.assertEqual(fake_serial.calls[-1], "stop") + + async def test_driver_scan_rack_id_uses_configured_command(self): + driver = MicronicDriver(rack_id_command=[sys.executable, "-c", "print('rack 9500017722')"]) + self.assertEqual(await driver.scan_rack_id(timeout=1.0, poll_interval=0.1), "9500017722") + + async def test_driver_raises_when_scan_result_is_not_ready(self): + driver = MicronicDriver() + with self.assertRaises(MicronicError): + await driver.get_scan_result() + + async def test_driver_rejects_unknown_layout(self): + driver = MicronicDriver() + with self.assertRaises(MicronicError): + await driver.set_current_layout("384") + + async def test_backend_delegates_to_driver(self): + driver = MicronicDriver(rack_id_override="9500017722") + backend = MicronicRackReadingBackend(driver=driver) + with patch.object(driver, "scan_rack_id", return_value="9500017722") as scan_rack_id: + rack_id = await backend.scan_rack_id(timeout=5.0, poll_interval=0.5) + self.assertEqual(rack_id, "9500017722") + scan_rack_id.assert_called_once_with(timeout=5.0, poll_interval=0.5) + + +class TestMicronicCodeReader(unittest.IsolatedAsyncioTestCase): + async def test_device_exposes_rack_reading_only(self): + reader = MicronicCodeReader( + timeout=12.0, + poll_interval=0.25, + driver=MicronicDriver(rack_id_override="9500017722"), + ) + with patch.object( + reader.driver, + "get_rack_reader_state", + return_value=RackReaderState.IDLE, + ): + await reader.setup() + try: + self.assertIn(reader.rack_reading, reader._capabilities) + self.assertFalse(hasattr(reader, "barcode_scanning")) + with patch.object( + reader.rack_reading, + "scan_rack", + return_value=MagicMock(rack_id="9500017722"), + ) as scan_rack: + result = await reader.rack_reading.scan_rack( + timeout=reader.default_timeout, + poll_interval=reader.default_poll_interval, + ) + finally: + await reader.stop() + + self.assertEqual(result.rack_id, "9500017722") + scan_rack.assert_called_once_with(timeout=12.0, poll_interval=0.25) + + async def test_frontend_uses_driver(self): + reader = MicronicCodeReader(rack_id_override="9500017722") + self.assertIsInstance(reader.driver, MicronicDriver) + self.assertFalse(hasattr(reader, "barcode_scanning")) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/micronic/code_reader/rack_reading_backend.py b/pylabrobot/micronic/code_reader/rack_reading_backend.py new file mode 100644 index 00000000000..b376fe83fb6 --- /dev/null +++ b/pylabrobot/micronic/code_reader/rack_reading_backend.py @@ -0,0 +1,79 @@ +"""Rack-reading backend for the Micronic driver.""" + +from __future__ import annotations + +from typing import Optional + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.rack_reading import ( + LayoutInfo, + RackReaderBackend, + RackReaderError, + RackReaderState, + RackScanResult, +) + +from .driver import MicronicDriver, MicronicError + + +class MicronicRackReaderError(MicronicError, RackReaderError): + """Raised when Micronic rack-reading operations fail.""" + + +class MicronicRackReadingBackend(RackReaderBackend): + """Rack-reading backend that delegates to the Micronic driver.""" + + def __init__(self, driver: MicronicDriver): + super().__init__() + self.driver = driver + + async def _on_setup(self, backend_params: Optional[BackendParams] = None): + await self.get_state() + + async def get_state(self) -> RackReaderState: + try: + return await self.driver.get_rack_reader_state() + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc + + async def trigger_rack_scan(self) -> None: + try: + await self.driver.trigger_rack_scan() + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc + + async def scan_rack_id(self, timeout: float, poll_interval: float) -> str: + try: + return await self.driver.scan_rack_id(timeout=timeout, poll_interval=poll_interval) + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc + + async def get_scan_result(self) -> RackScanResult: + try: + return await self.driver.get_scan_result() + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc + + async def get_rack_id(self) -> str: + try: + return await self.driver.get_rack_id() + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc + + async def get_layouts(self) -> list[LayoutInfo]: + try: + return await self.driver.get_layouts() + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc + + async def get_current_layout(self) -> str: + try: + return await self.driver.get_current_layout() + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc + + async def set_current_layout(self, layout: str) -> None: + try: + await self.driver.set_current_layout(layout) + except MicronicError as exc: + raise MicronicRackReaderError(str(exc)) from exc