diff --git a/docs/api/pylabrobot.capabilities.rst b/docs/api/pylabrobot.capabilities.rst
index 666ce292e20..3fc66738bac 100644
--- a/docs/api/pylabrobot.capabilities.rst
+++ b/docs/api/pylabrobot.capabilities.rst
@@ -192,6 +192,42 @@ Barcode Scanning
BarcodeScannerBackend
+Rack Reading
+------------
+
+.. currentmodule:: pylabrobot.capabilities.rack_reading.rack_reader
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ RackReader
+
+.. currentmodule:: pylabrobot.capabilities.rack_reading.backend
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ RackReaderBackend
+
+.. currentmodule:: pylabrobot.capabilities.rack_reading.standard
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ RackReaderState
+ RackReaderError
+ RackReaderTimeoutError
+ RackScanEntry
+ RackScanResult
+ LayoutInfo
+
+
Microscopy
----------
diff --git a/docs/api/pylabrobot.micronic.rst b/docs/api/pylabrobot.micronic.rst
new file mode 100644
index 00000000000..744042b4ce1
--- /dev/null
+++ b/docs/api/pylabrobot.micronic.rst
@@ -0,0 +1,46 @@
+.. currentmodule:: pylabrobot.micronic
+
+pylabrobot.micronic package
+===========================
+
+Micronic integrations built on the rack-reading capability.
+
+Device
+------
+
+.. currentmodule:: pylabrobot.micronic.code_reader.code_reader
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ MicronicCodeReader
+
+
+Driver
+------
+
+.. currentmodule:: pylabrobot.micronic.code_reader.driver
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ MicronicDriver
+ MicronicError
+
+
+Capabilities
+------------
+
+.. currentmodule:: pylabrobot.micronic.code_reader.rack_reading_backend
+
+.. autosummary::
+ :toctree: _autosummary
+ :nosignatures:
+ :recursive:
+
+ MicronicRackReadingBackend
+ MicronicRackReaderError
diff --git a/docs/api/pylabrobot.rst b/docs/api/pylabrobot.rst
index 53a04635937..5f46dd6be3f 100644
--- a/docs/api/pylabrobot.rst
+++ b/docs/api/pylabrobot.rst
@@ -41,6 +41,7 @@ Manufacturers
pylabrobot.inheco
pylabrobot.liconic
pylabrobot.mettler_toledo
+ pylabrobot.micronic
pylabrobot.molecular_devices
pylabrobot.opentrons
pylabrobot.qinstruments
diff --git a/docs/user_guide/capabilities/index.md b/docs/user_guide/capabilities/index.md
index 2a091c83f64..fb4e0f7ddad 100644
--- a/docs/user_guide/capabilities/index.md
+++ b/docs/user_guide/capabilities/index.md
@@ -55,6 +55,7 @@ loading-tray
pumping
weighing
barcode-scanning
+rack-reading
microscopy
automated-retrieval
absorbance
diff --git a/docs/user_guide/capabilities/rack-reading.md b/docs/user_guide/capabilities/rack-reading.md
new file mode 100644
index 00000000000..a8e163a7ade
--- /dev/null
+++ b/docs/user_guide/capabilities/rack-reading.md
@@ -0,0 +1,55 @@
+# Rack Reading
+
+The `rack_reading` capability standardizes rack-scale code readers that trigger a rack scan,
+report normalized state while scanning, and return structured per-position scan results.
+
+Unlike one-at-a-time code reads, rack reading is job-oriented and returns the full decoded rack map.
+
+## Public API
+
+```python
+from pylabrobot.capabilities.rack_reading import RackReader
+
+result = await reader.scan_rack(timeout=60.0, poll_interval=1.0)
+```
+
+`scan_rack()` is the main public operation. It triggers the scan, waits internally until the
+reader reaches `dataready`, and then returns a `RackScanResult`.
+
+If the hardware supports reading just the rack barcode without decoding all tube positions,
+`scan_rack_id()` exposes that as a rack-reading operation and returns the rack identifier only.
+
+Lower-level methods are also available:
+
+- `get_state()`
+- `wait_for_data_ready()`
+- `trigger_rack_scan()`
+- `scan_rack_id()`
+- `get_scan_result()`
+- `get_rack_id()`
+- `get_layouts()`
+- `get_current_layout()`
+- `set_current_layout(layout)`
+
+## Example With Micronic
+
+```python
+from pylabrobot.micronic import MicronicCodeReader
+
+reader = MicronicCodeReader(
+ scanner_backend="sane",
+ sane_device="avision:libusb:001:004",
+ serial_port="/dev/ttyUSB0",
+)
+await reader.setup()
+
+try:
+ result = await reader.rack_reading.scan_rack(timeout=90.0, poll_interval=1.0)
+ print(result.rack_id)
+ print(result.entries[0].position, result.entries[0].tube_id)
+
+ rack_id = await reader.rack_reading.scan_rack_id(timeout=60.0, poll_interval=1.0)
+ print(rack_id)
+finally:
+ await reader.stop()
+```
diff --git a/docs/user_guide/index.md b/docs/user_guide/index.md
index bb273f350cf..3f1ad03ea17 100644
--- a/docs/user_guide/index.md
+++ b/docs/user_guide/index.md
@@ -40,6 +40,7 @@ inheco/index
liconic/index
mettler_toledo/index
molecular_devices/index
+micronic/index
opentrons/index
qinstruments/index
thermo_fisher/index
diff --git a/docs/user_guide/machines.md b/docs/user_guide/machines.md
index dc057d79b15..db5a5fbcebb 100644
--- a/docs/user_guide/machines.md
+++ b/docs/user_guide/machines.md
@@ -188,6 +188,12 @@ tr > td:nth-child(5) { width: 15%; }
|--------------|---------|-------------|--------|
| Mettler Toledo | WXS205SDU | Full | [PLR](02_analytical/scales/mettler-toledo-WXS205SDU.ipynb) / [OEM](https://www.mt.com/us/en/home/products/Industrial_Weighing_Solutions/high-precision-weigh-sensors/weigh-module-wxs205sdu-15-11121008.html) |
+### Rack Readers
+
+| Manufacturer | Machine | Features | PLR-Support | Links |
+|--------------|---------|----------|-------------|--------|
+| Micronic | Direct local scanner + serial control | rack reading | Basics | [PLR](https://docs.pylabrobot.org/user_guide/micronic/index.html) / [OEM](https://www.micronic.com/products/code-reader/) |
+
---
## Understanding the Tables
diff --git a/docs/user_guide/micronic/index.md b/docs/user_guide/micronic/index.md
new file mode 100644
index 00000000000..d717de7348b
--- /dev/null
+++ b/docs/user_guide/micronic/index.md
@@ -0,0 +1,85 @@
+# Micronic
+
+PyLabRobot includes `v1b1` Micronic integrations built on the generic
+`rack_reading` capability.
+
+`MicronicCodeReader` controls the local hardware directly. It acquires the rack
+image through a configured scanner command, a Windows TWAIN helper available on
+PATH, or Ubuntu/Linux SANE `scanimage`; reads the side rack barcode through the
+serial reader; decodes tube DataMatrix codes locally; and returns a standard
+`RackScanResult`. It does not call Micronic Code Reader or IO Monitor, and
+PyLabRobot does not package any scanner helper executable.
+
+## Supported operations
+
+Rack reading (large scanner that decodes 96 tubes plus the side rack barcode):
+
+- `rack_reading.scan_rack()` to trigger image acquisition, decode all 96 tube
+ positions, read the side rack barcode, and return a `RackScanResult`
+- `rack_reading.scan_rack_id()` for a rack-barcode-only read on the side reader
+- `rack_reading.get_layouts()`, `get_current_layout()`, and
+ `set_current_layout()` for the fixed 8x12 rack layout
+
+## Hardware example
+
+The operator is responsible for installing any OS-level scanner bridge
+(`twain_scan`, `scanimage`, or a custom command), the PLR serial extra
+(`pylabrobot[serial]`), and the local Python decode dependencies in the runtime
+environment.
+
+```python
+from pylabrobot.micronic import MicronicCodeReader
+
+reader = MicronicCodeReader(
+ scanner_backend="twain",
+ twain_scanner_path=r"C:\Tools\twain_scan.exe",
+ twain_source="AVA6PlusG",
+ image_dir=r"C:\ProgramData\PyLabRobot\micronic-images",
+ serial_port="COM4",
+ keep_images=True,
+)
+await reader.setup()
+
+try:
+ rack_result = await reader.rack_reading.scan_rack(timeout=90.0, poll_interval=1.0)
+ print(rack_result.rack_id)
+ print(len([entry for entry in rack_result.entries if entry.tube_id]))
+
+ rack_id = await reader.rack_reading.scan_rack_id(timeout=5.0, poll_interval=0.5)
+ print(rack_id)
+finally:
+ await reader.stop()
+```
+
+On Ubuntu/Linux, use SANE if the scanner is exposed by a SANE backend:
+
+```python
+reader = MicronicCodeReader(
+ scanner_backend="sane",
+ sane_device="avision:libusb:001:004",
+ serial_port="/dev/ttyUSB0",
+ image_extension="tiff",
+)
+```
+
+For any other acquisition stack, pass `scan_command`. Each command argument is
+formatted with `{output_path}`, `{timeout_ms}`, `{twain_source}`, and
+`{sane_device}` before execution. The command must write the rack image to
+`{output_path}`.
+
+## Notes
+
+- `scan_rack` reads every tube barcode and finishes by reading the rack ID, so
+ it typically takes tens of seconds. `scan_rack_id` only reads the rack
+ barcode and completes in a few seconds.
+- TWAIN is a Windows scanner-driver API. PyLabRobot does not ship a TWAIN
+ bridge binary and does not install one for you; configure
+ `twain_scanner_path`, set `MICRONIC_TWAIN_SCANNER_PATH`, or put a local helper
+ named `twain_scan`/`twain_scan.exe` on PATH when using the `twain` backend.
+- Ubuntu/Linux scanner control should use SANE `scanimage` or a custom
+ `scan_command`. PyLabRobot does not install SANE or vendor scanner drivers.
+ Rack-ID reads use `pylabrobot.io.Serial`, which is installed through the
+ `pylabrobot[serial]` extra.
+- Image decoding imports `pillow`, `opencv-python-headless`, `numpy`, and
+ `zxing-cpp` at runtime. Install them in the environment that runs PyLabRobot.
+- Use `image_input` for offline decode checks without touching scanner hardware.
diff --git a/pylabrobot/capabilities/__init__.py b/pylabrobot/capabilities/__init__.py
index ddf84ebd973..d39489932c5 100644
--- a/pylabrobot/capabilities/__init__.py
+++ b/pylabrobot/capabilities/__init__.py
@@ -1 +1,11 @@
from .capability import Capability, CapabilityBackend, need_capability_ready
+from .rack_reading import (
+ LayoutInfo,
+ RackReader,
+ RackReaderBackend,
+ RackReaderError,
+ RackReaderState,
+ RackReaderTimeoutError,
+ RackScanEntry,
+ RackScanResult,
+)
diff --git a/pylabrobot/capabilities/rack_reading/__init__.py b/pylabrobot/capabilities/rack_reading/__init__.py
new file mode 100644
index 00000000000..f0d7a8d1eb3
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/__init__.py
@@ -0,0 +1,11 @@
+from .backend import RackReaderBackend
+from .chatterbox import RackReaderChatterboxBackend
+from .rack_reader import RackReader
+from .standard import (
+ LayoutInfo,
+ RackReaderError,
+ RackReaderState,
+ RackReaderTimeoutError,
+ RackScanEntry,
+ RackScanResult,
+)
diff --git a/pylabrobot/capabilities/rack_reading/backend.py b/pylabrobot/capabilities/rack_reading/backend.py
new file mode 100644
index 00000000000..e65c2725409
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/backend.py
@@ -0,0 +1,48 @@
+from __future__ import annotations
+
+from abc import ABCMeta, abstractmethod
+
+from pylabrobot.capabilities.capability import CapabilityBackend
+
+from .standard import LayoutInfo, RackReaderState, RackScanResult
+
+
+class RackReaderBackend(CapabilityBackend, metaclass=ABCMeta):
+ """Abstract backend for rack readers that decode position-indexed rack contents."""
+
+ @abstractmethod
+ async def get_state(self) -> RackReaderState:
+ """Return the current rack reader state."""
+
+ @abstractmethod
+ async def trigger_rack_scan(self) -> None:
+ """Initiate a rack-wide scan."""
+
+ @abstractmethod
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ """Perform a rack-barcode-only scan and return the rack identifier.
+
+ Backends whose hardware exposes a one-shot rack-id read may ignore
+ ``timeout`` and ``poll_interval``; backends that need a trigger/poll cycle
+ should respect them.
+ """
+
+ @abstractmethod
+ async def get_scan_result(self) -> RackScanResult:
+ """Return the most recent rack scan result."""
+
+ @abstractmethod
+ async def get_rack_id(self) -> str:
+ """Return the rack identifier reported by the scanner."""
+
+ @abstractmethod
+ async def get_layouts(self) -> list[LayoutInfo]:
+ """Return supported layouts."""
+
+ @abstractmethod
+ async def get_current_layout(self) -> str:
+ """Return the active layout."""
+
+ @abstractmethod
+ async def set_current_layout(self, layout: str) -> None:
+ """Set the active layout."""
diff --git a/pylabrobot/capabilities/rack_reading/chatterbox.py b/pylabrobot/capabilities/rack_reading/chatterbox.py
new file mode 100644
index 00000000000..ac3276ca6ea
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/chatterbox.py
@@ -0,0 +1,44 @@
+from __future__ import annotations
+
+from .backend import RackReaderBackend
+from .standard import LayoutInfo, RackReaderState, RackScanEntry, RackScanResult
+
+
+class RackReaderChatterboxBackend(RackReaderBackend):
+ """Device-free rack-reading backend for tests and examples."""
+
+ def __init__(self):
+ self._state = RackReaderState.IDLE
+ self._layout = "96"
+
+ async def get_state(self) -> RackReaderState:
+ return self._state
+
+ async def trigger_rack_scan(self) -> None:
+ self._state = RackReaderState.DATAREADY
+
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ self._state = RackReaderState.DATAREADY
+ return "CHATTERBOX"
+
+ async def get_scan_result(self) -> RackScanResult:
+ return RackScanResult(
+ rack_id="CHATTERBOX",
+ date="19700101",
+ time="000000",
+ entries=[
+ RackScanEntry(position="A01", tube_id="SIMULATED", status="Code OK"),
+ ],
+ )
+
+ async def get_rack_id(self) -> str:
+ return "CHATTERBOX"
+
+ async def get_layouts(self) -> list[LayoutInfo]:
+ return [LayoutInfo(name="96"), LayoutInfo(name="48")]
+
+ async def get_current_layout(self) -> str:
+ return self._layout
+
+ async def set_current_layout(self, layout: str) -> None:
+ self._layout = layout
diff --git a/pylabrobot/capabilities/rack_reading/rack_reader.py b/pylabrobot/capabilities/rack_reading/rack_reader.py
new file mode 100644
index 00000000000..48a87e55574
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/rack_reader.py
@@ -0,0 +1,127 @@
+from __future__ import annotations
+
+import asyncio
+import time
+
+from pylabrobot.capabilities.capability import Capability, need_capability_ready
+
+from .backend import RackReaderBackend
+from .standard import LayoutInfo, RackReaderState, RackReaderTimeoutError, RackScanResult
+
+
+class RackReader(Capability):
+ """Rack-reading capability."""
+
+ def __init__(self, backend: RackReaderBackend):
+ super().__init__(backend=backend)
+ self.backend: RackReaderBackend = backend
+
+ @need_capability_ready
+ async def get_state(self) -> RackReaderState:
+ return await self.backend.get_state()
+
+ @need_capability_ready
+ async def trigger_rack_scan(self) -> None:
+ await self.backend.trigger_rack_scan()
+
+ @need_capability_ready
+ async def get_scan_result(self) -> RackScanResult:
+ return await self.backend.get_scan_result()
+
+ @need_capability_ready
+ async def get_rack_id(self) -> str:
+ return await self.backend.get_rack_id()
+
+ @need_capability_ready
+ async def get_layouts(self) -> list[LayoutInfo]:
+ return await self.backend.get_layouts()
+
+ @need_capability_ready
+ async def get_current_layout(self) -> str:
+ return await self.backend.get_current_layout()
+
+ @need_capability_ready
+ async def set_current_layout(self, layout: str) -> None:
+ await self.backend.set_current_layout(layout)
+
+ async def _wait_for_state(
+ self,
+ target: RackReaderState,
+ timeout: float,
+ poll_interval: float,
+ ) -> RackReaderState:
+ deadline = time.monotonic() + timeout
+ while True:
+ state = await self.backend.get_state()
+ if state == target:
+ return state
+ if time.monotonic() >= deadline:
+ raise RackReaderTimeoutError(f"Timed out waiting for rack reader to reach {target.value}.")
+ await asyncio.sleep(poll_interval)
+
+ async def _wait_for_fresh_data_ready(
+ self,
+ initial_state: RackReaderState,
+ timeout: float,
+ poll_interval: float,
+ ) -> None:
+ require_state_change = initial_state == RackReaderState.DATAREADY
+ deadline = time.monotonic() + timeout
+ while True:
+ state = await self.backend.get_state()
+ if state != RackReaderState.DATAREADY:
+ require_state_change = False
+ elif not require_state_change:
+ return
+
+ if time.monotonic() >= deadline:
+ raise RackReaderTimeoutError(
+ f"Timed out waiting for rack reader to reach {RackReaderState.DATAREADY.value}."
+ )
+ await asyncio.sleep(poll_interval)
+
+ @need_capability_ready
+ async def wait_for_data_ready(
+ self,
+ timeout: float = 60.0,
+ poll_interval: float = 1.0,
+ ) -> RackReaderState:
+ """Wait until the reader reports that scan data is ready."""
+
+ return await self._wait_for_state(
+ target=RackReaderState.DATAREADY,
+ timeout=timeout,
+ poll_interval=poll_interval,
+ )
+
+ @need_capability_ready
+ async def scan_rack(
+ self,
+ timeout: float = 60.0,
+ poll_interval: float = 1.0,
+ ) -> RackScanResult:
+ """Trigger a rack scan and return the completed result."""
+
+ initial_state = await self.backend.get_state()
+ await self.backend.trigger_rack_scan()
+ await self._wait_for_fresh_data_ready(
+ initial_state=initial_state,
+ timeout=timeout,
+ poll_interval=poll_interval,
+ )
+ return await self.backend.get_scan_result()
+
+ @need_capability_ready
+ async def scan_rack_id(
+ self,
+ timeout: float = 60.0,
+ poll_interval: float = 1.0,
+ ) -> str:
+ """Perform a rack-barcode-only scan and return the rack identifier.
+
+ The backend decides whether this is a one-shot read or a trigger/poll cycle;
+ ``timeout`` and ``poll_interval`` are forwarded so backends that poll can
+ honor them.
+ """
+
+ return await self.backend.scan_rack_id(timeout=timeout, poll_interval=poll_interval)
diff --git a/pylabrobot/capabilities/rack_reading/rack_reader_tests.py b/pylabrobot/capabilities/rack_reading/rack_reader_tests.py
new file mode 100644
index 00000000000..9b2dc1167ce
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/rack_reader_tests.py
@@ -0,0 +1,142 @@
+import unittest
+
+from pylabrobot.capabilities.rack_reading.backend import RackReaderBackend
+from pylabrobot.capabilities.rack_reading.rack_reader import RackReader
+from pylabrobot.capabilities.rack_reading.standard import (
+ LayoutInfo,
+ RackReaderState,
+ RackReaderTimeoutError,
+ RackScanEntry,
+ RackScanResult,
+)
+
+
+class RecordingRackReaderBackend(RackReaderBackend):
+ def __init__(self):
+ self.state = RackReaderState.IDLE
+ self.calls: list[str] = []
+ self.result = RackScanResult(
+ rack_id="5500135415",
+ date="20260316",
+ time="160626",
+ entries=[
+ RackScanEntry(position="A01", tube_id="7518613629", status="Code OK", free_text=""),
+ ],
+ )
+
+ async def get_state(self) -> RackReaderState:
+ self.calls.append("get_state")
+ if self.state == RackReaderState.SCANNING:
+ self.state = RackReaderState.DATAREADY
+ return self.state
+
+ async def trigger_rack_scan(self) -> None:
+ self.calls.append("trigger_rack_scan")
+ self.state = RackReaderState.SCANNING
+
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ self.calls.append(f"scan_rack_id:{timeout}:{poll_interval}")
+ return self.result.rack_id
+
+ async def get_scan_result(self) -> RackScanResult:
+ self.calls.append("get_scan_result")
+ return self.result
+
+ async def get_rack_id(self) -> str:
+ self.calls.append("get_rack_id")
+ return self.result.rack_id
+
+ async def get_layouts(self) -> list[LayoutInfo]:
+ self.calls.append("get_layouts")
+ return [LayoutInfo(name="96")]
+
+ async def get_current_layout(self) -> str:
+ self.calls.append("get_current_layout")
+ return "96"
+
+ async def set_current_layout(self, layout: str) -> None:
+ self.calls.append(f"set_current_layout:{layout}")
+
+
+class StuckRackReaderBackend(RecordingRackReaderBackend):
+ async def get_state(self) -> RackReaderState:
+ self.calls.append("get_state")
+ return RackReaderState.SCANNING
+
+
+class StaleDataReadyRackReaderBackend(RecordingRackReaderBackend):
+ def __init__(self):
+ super().__init__()
+ self.state = RackReaderState.DATAREADY
+ self._states_after_trigger = [
+ RackReaderState.DATAREADY,
+ RackReaderState.SCANNING,
+ RackReaderState.DATAREADY,
+ ]
+
+ async def trigger_rack_scan(self) -> None:
+ self.calls.append("trigger_rack_scan")
+
+ async def get_state(self) -> RackReaderState:
+ self.calls.append("get_state")
+ if self._states_after_trigger:
+ return self._states_after_trigger.pop(0)
+ return RackReaderState.DATAREADY
+
+
+class TestRackReader(unittest.IsolatedAsyncioTestCase):
+ async def test_scan_rack_triggers_and_returns_result(self):
+ backend = RecordingRackReaderBackend()
+ reader = RackReader(backend=backend)
+ await reader._on_setup()
+
+ result = await reader.scan_rack(timeout=1.0, poll_interval=0.01)
+
+ self.assertEqual(result.rack_id, "5500135415")
+ self.assertEqual(result.entries[0].position, "A01")
+ self.assertEqual(
+ backend.calls[:3],
+ ["get_state", "trigger_rack_scan", "get_state"],
+ )
+
+ async def test_scan_rack_waits_for_new_dataready_cycle(self):
+ backend = StaleDataReadyRackReaderBackend()
+ reader = RackReader(backend=backend)
+ await reader._on_setup()
+
+ result = await reader.scan_rack(timeout=1.0, poll_interval=0.0)
+
+ self.assertEqual(result.rack_id, "5500135415")
+ self.assertEqual(
+ backend.calls,
+ ["get_state", "trigger_rack_scan", "get_state", "get_state", "get_scan_result"],
+ )
+
+ async def test_scan_rack_id_delegates_to_backend(self):
+ backend = RecordingRackReaderBackend()
+ reader = RackReader(backend=backend)
+ await reader._on_setup()
+
+ rack_id = await reader.scan_rack_id(timeout=1.0, poll_interval=0.01)
+
+ self.assertEqual(rack_id, "5500135415")
+ self.assertEqual(backend.calls, ["scan_rack_id:1.0:0.01"])
+
+ async def test_scan_rack_times_out(self):
+ backend = StuckRackReaderBackend()
+ reader = RackReader(backend=backend)
+ await reader._on_setup()
+
+ with self.assertRaises(RackReaderTimeoutError):
+ await reader.scan_rack(timeout=0.01, poll_interval=0.0)
+
+ async def test_requires_setup(self):
+ backend = RecordingRackReaderBackend()
+ reader = RackReader(backend=backend)
+
+ with self.assertRaises(RuntimeError):
+ await reader.get_state()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/pylabrobot/capabilities/rack_reading/standard.py b/pylabrobot/capabilities/rack_reading/standard.py
new file mode 100644
index 00000000000..fc46cf0d3b8
--- /dev/null
+++ b/pylabrobot/capabilities/rack_reading/standard.py
@@ -0,0 +1,53 @@
+from __future__ import annotations
+
+import enum
+from dataclasses import dataclass
+from typing import Optional
+
+
+class RackReaderError(Exception):
+ """Base exception for rack reader operations."""
+
+
+class RackReaderTimeoutError(RackReaderError):
+ """Raised when a rack reader operation times out."""
+
+
+class RackReaderState(enum.Enum):
+ """Normalized rack reader states."""
+
+ IDLE = "idle"
+ SCANNING = "scanning"
+ DATAREADY = "dataready"
+
+
+@dataclass
+class RackScanEntry:
+ """One decoded rack position."""
+
+ position: str
+ tube_id: Optional[str]
+ status: str
+ free_text: str = ""
+
+
+@dataclass
+class RackScanResult:
+ """A decoded rack scan."""
+
+ rack_id: str
+ date: str
+ time: str
+ entries: list[RackScanEntry]
+
+
+@dataclass
+class LayoutInfo:
+ """One rack layout supported by the reader.
+
+ Wraps a single ``name`` field today so backends can grow the metadata they
+ attach to a layout (rows, columns, tube type, vendor-specific attributes)
+ without changing the capability surface.
+ """
+
+ name: str
diff --git a/pylabrobot/micronic/__init__.py b/pylabrobot/micronic/__init__.py
new file mode 100644
index 00000000000..f55676153d0
--- /dev/null
+++ b/pylabrobot/micronic/__init__.py
@@ -0,0 +1,7 @@
+from pylabrobot.micronic.code_reader import (
+ MicronicCodeReader,
+ MicronicError,
+ MicronicDriver,
+ MicronicRackReadingBackend,
+ MicronicRackReaderError,
+)
diff --git a/pylabrobot/micronic/code_reader/__init__.py b/pylabrobot/micronic/code_reader/__init__.py
new file mode 100644
index 00000000000..2498dcd12a3
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/__init__.py
@@ -0,0 +1,9 @@
+from pylabrobot.micronic.code_reader.code_reader import MicronicCodeReader
+from pylabrobot.micronic.code_reader.driver import (
+ MicronicError,
+ MicronicDriver,
+)
+from pylabrobot.micronic.code_reader.rack_reading_backend import (
+ MicronicRackReadingBackend,
+ MicronicRackReaderError,
+)
diff --git a/pylabrobot/micronic/code_reader/code_reader.py b/pylabrobot/micronic/code_reader/code_reader.py
new file mode 100644
index 00000000000..c768f530a54
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/code_reader.py
@@ -0,0 +1,70 @@
+"""Micronic Code Reader device."""
+
+from __future__ import annotations
+
+from typing import Optional, Sequence
+
+from pylabrobot.capabilities.rack_reading import RackReader
+from pylabrobot.device import Device
+
+from .driver import MicronicDriver
+from .rack_reading_backend import MicronicRackReadingBackend
+
+
+class MicronicCodeReader(Device):
+ """Micronic rack reader device.
+
+ The rack-reading capability is driven by ``MicronicDriver``.
+ """
+
+ def __init__(
+ self,
+ twain_scanner_path: Optional[str] = None,
+ twain_source: str = "AVA6PlusG",
+ sane_device: Optional[str] = None,
+ scanner_backend: str = "auto",
+ scan_command: Optional[Sequence[str]] = None,
+ image_extension: Optional[str] = None,
+ image_dir: Optional[str] = None,
+ serial_port: str = "COM4",
+ rack_id_command: Optional[Sequence[str]] = None,
+ timeout: float = 90.0,
+ poll_interval: float = 1.0,
+ serial_timeout_ms: int = 2500,
+ min_wells: int = 96,
+ keep_images: bool = False,
+ image_input: Optional[str] = None,
+ rack_id_override: Optional[str] = None,
+ driver: Optional[MicronicDriver] = None,
+ ):
+ if driver is None:
+ driver = MicronicDriver(
+ twain_scanner_path=twain_scanner_path,
+ twain_source=twain_source,
+ sane_device=sane_device,
+ scanner_backend=scanner_backend,
+ scan_command=scan_command,
+ image_extension=image_extension,
+ image_dir=image_dir,
+ serial_port=serial_port,
+ rack_id_command=rack_id_command,
+ scanner_timeout_ms=int(timeout * 1000),
+ serial_timeout_ms=serial_timeout_ms,
+ min_wells=min_wells,
+ keep_images=keep_images,
+ image_input=image_input,
+ rack_id_override=rack_id_override,
+ )
+ super().__init__(driver=driver)
+ self.driver: MicronicDriver = driver
+ self.default_timeout = timeout
+ self.default_poll_interval = poll_interval
+ self.rack_reading = RackReader(backend=MicronicRackReadingBackend(driver))
+ self._capabilities = [self.rack_reading]
+
+ def serialize(self) -> dict:
+ return {
+ **super().serialize(),
+ "timeout": self.default_timeout,
+ "poll_interval": self.default_poll_interval,
+ }
diff --git a/pylabrobot/micronic/code_reader/driver.py b/pylabrobot/micronic/code_reader/driver.py
new file mode 100644
index 00000000000..666aeeff9f9
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/driver.py
@@ -0,0 +1,843 @@
+"""Hardware driver for the Micronic rack scanner.
+
+This driver does not call Micronic Code Reader or IO Monitor. It owns the local
+scanner path directly:
+
+- acquire a rack image through a user-supplied scan command, Windows TWAIN
+ helper, or SANE ``scanimage`` command,
+- read the rack ID through the side serial barcode reader,
+- decode tube DataMatrix codes locally, and
+- return the standard PLR rack-reading result.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import re
+import shutil
+import subprocess # nosec B404 - local scanner/rack-id helper execution is the interface.
+import tempfile
+import time
+from dataclasses import dataclass
+from datetime import datetime
+from pathlib import Path
+from typing import Iterable, Optional, Sequence
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.capabilities.rack_reading import (
+ LayoutInfo,
+ RackReaderState,
+ RackScanEntry,
+ RackScanResult,
+)
+from pylabrobot.device import Driver
+from pylabrobot.io.serial import Serial
+
+
+ROWS = "ABCDEFGH"
+COLS = 12
+RACK_ROWS = 8
+RACK_COLS = 12
+
+
+class MicronicError(Exception):
+ """Raised when Micronic driver operations fail."""
+
+
+@dataclass(frozen=True)
+class DecodeResult:
+ tube_id: str
+ method: str
+
+
+class MicronicDriver(Driver):
+ """Driver that controls the Micronic scanner without the OEM app."""
+
+ def __init__(
+ self,
+ twain_scanner_path: Optional[str] = None,
+ twain_source: str = "AVA6PlusG",
+ sane_device: Optional[str] = None,
+ scanner_backend: str = "auto",
+ scan_command: Optional[Sequence[str]] = None,
+ image_extension: Optional[str] = None,
+ image_dir: Optional[str] = None,
+ serial_port: str = "COM4",
+ rack_id_command: Optional[Sequence[str]] = None,
+ scanner_timeout_ms: int = 90000,
+ serial_timeout_ms: int = 2500,
+ min_wells: int = 96,
+ keep_images: bool = False,
+ image_input: Optional[str] = None,
+ rack_id_override: Optional[str] = None,
+ ):
+ super().__init__()
+ self.twain_scanner_path = twain_scanner_path
+ self.twain_source = twain_source
+ self.sane_device = sane_device
+ self.scanner_backend = scanner_backend
+ self.scan_command = list(scan_command) if scan_command is not None else None
+ self.image_extension = normalize_image_extension(image_extension) if image_extension else None
+ self.image_dir = (
+ Path(image_dir) if image_dir else Path(tempfile.gettempdir()) / "pylabrobot-micronic"
+ )
+ self.serial_port = serial_port
+ self.rack_id_command = list(rack_id_command) if rack_id_command is not None else None
+ self.scanner_timeout_ms = scanner_timeout_ms
+ self.serial_timeout_ms = serial_timeout_ms
+ self.min_wells = min_wells
+ self.keep_images = keep_images
+ self.image_input = image_input
+ self.rack_id_override = rack_id_override
+ self._state = RackReaderState.IDLE
+ self._last_result: Optional[RackScanResult] = None
+ self._scan_task: Optional[asyncio.Future[RackScanResult]] = None
+ self._scan_error: Optional[Exception] = None
+ self._reported_scanning_since_trigger = False
+ self.last_image_path: Optional[Path] = None
+ self.last_scan_metadata: dict[str, object] = {}
+ self.last_decode_metadata: dict[str, object] = {}
+
+ async def setup(self, backend_params: Optional[BackendParams] = None):
+ del backend_params
+ self.image_dir.mkdir(parents=True, exist_ok=True)
+
+ async def stop(self):
+ scan_task = self._scan_task
+ if scan_task is None:
+ if self._scan_error is not None:
+ raise self._scan_error
+ return
+ if scan_task.done():
+ self._complete_scan_task(scan_task)
+ else:
+ try:
+ self._last_result = await scan_task
+ except asyncio.CancelledError:
+ self._scan_error = MicronicError("Micronic rack scan was cancelled.")
+ self._state = RackReaderState.IDLE
+ except Exception as exc:
+ self._scan_error = exc
+ self._state = RackReaderState.IDLE
+ else:
+ self._scan_error = None
+ self._state = RackReaderState.DATAREADY
+ finally:
+ self._scan_task = None
+ if self._scan_error is not None:
+ raise self._scan_error
+
+ def serialize(self) -> dict:
+ return {
+ **super().serialize(),
+ "twain_scanner_path": self.twain_scanner_path,
+ "twain_source": self.twain_source,
+ "sane_device": self.sane_device,
+ "scanner_backend": self.scanner_backend,
+ "scan_command": self.scan_command,
+ "image_extension": self.image_extension,
+ "image_dir": str(self.image_dir),
+ "serial_port": self.serial_port,
+ "rack_id_command": self.rack_id_command,
+ "scanner_timeout_ms": self.scanner_timeout_ms,
+ "serial_timeout_ms": self.serial_timeout_ms,
+ "min_wells": self.min_wells,
+ "keep_images": self.keep_images,
+ "image_input": self.image_input,
+ "rack_id_override": self.rack_id_override,
+ }
+
+ async def get_rack_reader_state(self) -> RackReaderState:
+ if self._state == RackReaderState.SCANNING and not self._reported_scanning_since_trigger:
+ self._reported_scanning_since_trigger = True
+ return self._state
+ self._complete_finished_scan_task()
+ if self._scan_error is not None:
+ raise self._scan_error
+ return self._state
+
+ async def trigger_rack_scan(self) -> None:
+ self._complete_finished_scan_task()
+ if self._scan_task is not None and not self._scan_task.done():
+ raise MicronicError("Micronic rack scan is already in progress.")
+ self._last_result = None
+ self._state = RackReaderState.SCANNING
+ self._scan_error = None
+ self._reported_scanning_since_trigger = False
+ loop = asyncio.get_running_loop()
+ self._scan_task = loop.run_in_executor(None, self._scan_rack_blocking)
+
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ del timeout, poll_interval
+ if self.rack_id_override:
+ return self.rack_id_override
+ if self.rack_id_command is not None:
+ return await asyncio.to_thread(
+ read_rack_id_command,
+ format_command(
+ self.rack_id_command,
+ serial_port=self.serial_port,
+ timeout_ms=self.serial_timeout_ms,
+ ),
+ self.serial_timeout_ms,
+ )
+ return await read_rack_id_plr_serial(
+ serial_port=self.serial_port,
+ timeout_ms=self.serial_timeout_ms,
+ )
+
+ async def get_scan_result(self) -> RackScanResult:
+ self._complete_finished_scan_task()
+ if self._scan_error is not None:
+ raise self._scan_error
+ if self._state == RackReaderState.SCANNING:
+ raise MicronicError("Micronic rack scan is still in progress.")
+ if self._last_result is None:
+ raise MicronicError("No Micronic rack scan has completed yet.")
+ return self._last_result
+
+ async def get_rack_id(self) -> str:
+ self._complete_finished_scan_task()
+ if self._scan_error is not None:
+ raise self._scan_error
+ if self._state == RackReaderState.SCANNING:
+ raise MicronicError("Micronic rack scan is still in progress.")
+ if self._last_result is not None:
+ return self._last_result.rack_id
+ return await self.scan_rack_id(timeout=0, poll_interval=0)
+
+ def _complete_finished_scan_task(self) -> None:
+ if self._scan_task is not None and self._scan_task.done():
+ self._complete_scan_task(self._scan_task)
+
+ def _complete_scan_task(self, task: asyncio.Future[RackScanResult]) -> None:
+ if task is not self._scan_task:
+ return
+
+ self._scan_task = None
+ try:
+ self._last_result = task.result()
+ except asyncio.CancelledError:
+ self._scan_error = MicronicError("Micronic rack scan was cancelled.")
+ self._state = RackReaderState.IDLE
+ except Exception as exc:
+ self._scan_error = exc
+ self._state = RackReaderState.IDLE
+ else:
+ self._scan_error = None
+ self._state = RackReaderState.DATAREADY
+
+ async def get_layouts(self) -> list[LayoutInfo]:
+ return [LayoutInfo(name="8x12")]
+
+ async def get_current_layout(self) -> str:
+ return "8x12"
+
+ async def set_current_layout(self, layout: str) -> None:
+ normalized = layout.strip().lower().replace(" ", "")
+ if normalized not in {"8x12", "96(8x12)", "96"}:
+ raise MicronicError(f"Unsupported Micronic rack layout: {layout}")
+
+ def _scan_rack_blocking(self) -> RackScanResult:
+ self.image_dir.mkdir(parents=True, exist_ok=True)
+ image_extension = choose_image_extension(
+ image_extension=self.image_extension,
+ image_input=self.image_input,
+ scanner_backend=self.scanner_backend,
+ scan_command=self.scan_command,
+ twain_scanner_path=self.twain_scanner_path,
+ sane_device=self.sane_device,
+ )
+ image_path = (
+ self.image_dir / f"micronic_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.{image_extension}"
+ )
+
+ self.last_scan_metadata = run_scan(
+ twain_scanner_path=self.twain_scanner_path,
+ twain_source=self.twain_source,
+ sane_device=self.sane_device,
+ scanner_backend=self.scanner_backend,
+ scan_command=self.scan_command,
+ output_path=image_path,
+ timeout_ms=self.scanner_timeout_ms,
+ image_input=self.image_input,
+ )
+ self.last_image_path = image_path
+
+ rack_id = read_rack_id(
+ serial_port=self.serial_port,
+ timeout_ms=self.serial_timeout_ms,
+ rack_id_override=self.rack_id_override,
+ rack_id_command=self.rack_id_command,
+ )
+ decoded, self.last_decode_metadata = decode_image(image_path)
+ if len(decoded) < self.min_wells:
+ missing = ", ".join(position for position in iter_positions() if position not in decoded)
+ raise MicronicError(
+ f"Micronic decode found {len(decoded)} wells; expected at least {self.min_wells}. "
+ f"Missing: {missing}"
+ )
+
+ now = datetime.now()
+ date_text = now.strftime("%Y%m%d")
+ time_text = now.strftime("%H%M%S")
+ entries = [
+ RackScanEntry(
+ position=position,
+ tube_id=decoded[position].tube_id if position in decoded else None,
+ status="OK" if position in decoded else "NOREAD",
+ free_text=decoded[position].method if position in decoded else "",
+ )
+ for position in iter_positions()
+ ]
+
+ if not self.keep_images and self.image_input is None:
+ try:
+ image_path.unlink()
+ self.last_image_path = None
+ except OSError:
+ pass
+
+ return RackScanResult(
+ rack_id=rack_id,
+ date=date_text,
+ time=time_text,
+ entries=entries,
+ )
+
+
+def run_scan(
+ output_path: Path,
+ timeout_ms: int,
+ twain_scanner_path: Optional[str] = None,
+ twain_source: str = "AVA6PlusG",
+ sane_device: Optional[str] = None,
+ scanner_backend: str = "auto",
+ scan_command: Optional[Sequence[str]] = None,
+ image_input: Optional[str] = None,
+) -> dict[str, object]:
+ if image_input:
+ source_path = Path(image_input)
+ if not source_path.exists():
+ raise MicronicError(f"Image input does not exist: {source_path}")
+ output_path.write_bytes(source_path.read_bytes())
+ return {"stdout": "", "stderr": "", "source": str(source_path)}
+
+ if scan_command is not None:
+ command = format_command(
+ scan_command,
+ output_path=output_path,
+ timeout_ms=timeout_ms,
+ twain_source=twain_source,
+ sane_device=sane_device or "",
+ )
+ return run_scan_command(command, output_path, timeout_ms, source="command")
+
+ backend = normalize_scanner_backend(scanner_backend)
+ if backend == "command":
+ raise MicronicError("Command scan requested, but scan_command was not configured.")
+
+ if backend in {"auto", "twain"}:
+ resolved_twain_path = resolve_twain_scanner_path(twain_scanner_path)
+ if resolved_twain_path is not None:
+ return run_scan_command(
+ [resolved_twain_path, str(output_path), twain_source, str(timeout_ms)],
+ output_path,
+ timeout_ms,
+ source="twain",
+ )
+ if backend == "twain":
+ raise MicronicError(
+ "TWAIN scan requested, but no TWAIN helper was configured. Set "
+ "twain_scanner_path, MICRONIC_TWAIN_SCANNER_PATH, or put twain_scan on PATH."
+ )
+
+ if backend in {"auto", "sane"}:
+ scanimage_path = shutil.which("scanimage")
+ if scanimage_path is not None:
+ command = [scanimage_path]
+ if sane_device:
+ command.extend(["--device-name", sane_device])
+ command.extend(["--format=tiff", "--output-file", str(output_path)])
+ return run_scan_command(command, output_path, timeout_ms, source="sane")
+ if backend == "sane":
+ raise MicronicError("SANE scan requested, but scanimage was not found on PATH.")
+
+ raise MicronicError(
+ "No scan acquisition method is available. Configure scan_command, "
+ "twain_scanner_path/MICRONIC_TWAIN_SCANNER_PATH, or install SANE scanimage."
+ )
+
+
+def run_scan_command(
+ command: Sequence[str],
+ output_path: Path,
+ timeout_ms: int,
+ source: str,
+) -> dict[str, object]:
+ try:
+ completed = subprocess.run( # nosec B603 - operator-configured command, shell=False.
+ list(command),
+ check=False,
+ capture_output=True,
+ text=True,
+ timeout=(timeout_ms / 1000) + 15,
+ )
+ except FileNotFoundError as exc:
+ raise MicronicError(f"Scan command was not found: {command[0]}") from exc
+
+ if completed.returncode != 0:
+ raise MicronicError(
+ "Scan command failed with exit code "
+ f"{completed.returncode}: {completed.stderr.strip() or completed.stdout.strip()}"
+ )
+ if not output_path.exists():
+ raise MicronicError(f"Scan command did not create image: {output_path}")
+ return {
+ "stdout": completed.stdout.strip(),
+ "stderr": completed.stderr.strip(),
+ "source": source,
+ "command": list(command),
+ }
+
+
+def read_rack_id(
+ serial_port: str = "COM4",
+ timeout_ms: int = 2500,
+ rack_id_override: Optional[str] = None,
+ rack_id_command: Optional[Sequence[str]] = None,
+) -> str:
+ if rack_id_override:
+ return rack_id_override
+
+ if rack_id_command is not None:
+ command = format_command(
+ rack_id_command,
+ serial_port=serial_port,
+ timeout_ms=timeout_ms,
+ )
+ return read_rack_id_command(command, timeout_ms)
+
+ return asyncio.run(read_rack_id_plr_serial(serial_port=serial_port, timeout_ms=timeout_ms))
+
+
+async def read_rack_id_plr_serial(serial_port: str, timeout_ms: int) -> str:
+ deadline = time.monotonic() + timeout_ms / 1000
+ chunks: list[bytes] = []
+ io = Serial(
+ human_readable_device_name="Micronic rack ID reader",
+ port=serial_port,
+ baudrate=9600,
+ bytesize=7,
+ parity="E",
+ stopbits=1,
+ timeout=0.1,
+ write_timeout=1.0,
+ )
+ try:
+ await io.setup()
+ await io.reset_input_buffer()
+ await io.write(b"\r\n")
+ while time.monotonic() < deadline:
+ value = await io.read(1)
+ if value:
+ chunks.append(value)
+ if value in {b"\r", b"\n"}:
+ break
+ except Exception as exc:
+ raise MicronicError(
+ "Rack ID serial read failed. Install the PLR serial extra with "
+ "`pip install pylabrobot[serial]` and verify the serial port: "
+ f"{exc}"
+ ) from exc
+ finally:
+ await io.stop()
+
+ return extract_rack_id(b"".join(chunks).decode("utf-8", errors="ignore"))
+
+
+def read_rack_id_command(command: Sequence[str], timeout_ms: int) -> str:
+ try:
+ completed = subprocess.run( # nosec B603 - operator-configured command, shell=False.
+ list(command),
+ check=False,
+ capture_output=True,
+ text=True,
+ timeout=(timeout_ms / 1000) + 5,
+ )
+ except FileNotFoundError as exc:
+ raise MicronicError(f"Rack ID command was not found: {command[0]}") from exc
+
+ if completed.returncode != 0:
+ raise MicronicError(
+ "Rack ID command failed with exit code "
+ f"{completed.returncode}: {completed.stderr.strip() or completed.stdout.strip()}"
+ )
+ return extract_rack_id(completed.stdout)
+
+
+def extract_rack_id(text: str) -> str:
+ match = re.search(r"\d{6,}", text)
+ return match.group(0) if match else "NOREAD"
+
+
+def normalize_scanner_backend(scanner_backend: str) -> str:
+ backend = scanner_backend.strip().lower()
+ if backend not in {"auto", "twain", "sane", "command"}:
+ raise MicronicError(
+ "Unsupported scanner backend "
+ f"{scanner_backend!r}; expected 'auto', 'twain', 'sane', or 'command'."
+ )
+ return backend
+
+
+def normalize_image_extension(image_extension: str) -> str:
+ normalized = image_extension.strip().lstrip(".")
+ if not normalized:
+ raise MicronicError("image_extension must not be empty.")
+ return normalized
+
+
+def choose_image_extension(
+ image_extension: Optional[str],
+ image_input: Optional[str],
+ scanner_backend: str,
+ scan_command: Optional[Sequence[str]],
+ twain_scanner_path: Optional[str],
+ sane_device: Optional[str],
+) -> str:
+ if image_extension:
+ return normalize_image_extension(image_extension)
+ if image_input and Path(image_input).suffix:
+ return normalize_image_extension(Path(image_input).suffix)
+
+ backend = normalize_scanner_backend(scanner_backend)
+ if backend == "sane" or (
+ backend == "auto"
+ and scan_command is None
+ and twain_scanner_path is None
+ and (sane_device is not None or os.name != "nt")
+ ):
+ return "tiff"
+ return "bmp"
+
+
+def resolve_twain_scanner_path(twain_scanner_path: Optional[str]) -> Optional[str]:
+ if twain_scanner_path:
+ return twain_scanner_path
+
+ env_path = os.environ.get("MICRONIC_TWAIN_SCANNER_PATH")
+ if env_path:
+ return env_path
+
+ return shutil.which("twain_scan.exe") or shutil.which("twain_scan")
+
+
+def format_command(command: Sequence[str], **values: object) -> list[str]:
+ return [part.format(**values) for part in command]
+
+
+def decode_image(image_path: Path) -> tuple[dict[str, DecodeResult], dict[str, object]]:
+ cv2, np, zxingcpp, Image, ImageOps = import_decode_dependencies()
+ with Image.open(image_path) as loaded_image:
+ image = loaded_image.convert("L")
+ full_results = zxingcpp.read_barcodes(
+ image,
+ formats=zxingcpp.BarcodeFormat.DataMatrix,
+ try_rotate=True,
+ try_downscale=True,
+ try_invert=True,
+ )
+
+ detected: list[tuple[float, float, str]] = []
+ for result in full_results:
+ if not is_tube_id(result.text):
+ continue
+ corners = [
+ result.position.top_left,
+ result.position.top_right,
+ result.position.bottom_right,
+ result.position.bottom_left,
+ ]
+ detected.append(
+ (
+ sum(corner.x for corner in corners) / 4,
+ sum(corner.y for corner in corners) / 4,
+ result.text,
+ )
+ )
+
+ if len(detected) < 24:
+ raise MicronicError(f"Only {len(detected)} DataMatrix codes were found in the full image.")
+
+ xs = fitted_axis(cluster_axis([item[0] for item in detected], RACK_ROWS, 90), RACK_ROWS)
+ ys = fitted_axis(cluster_axis([item[1] for item in detected], RACK_COLS, 90), RACK_COLS)
+ x_pitch = abs(xs[-1] - xs[0]) / (RACK_ROWS - 1)
+ y_pitch = abs(ys[-1] - ys[0]) / (RACK_COLS - 1)
+
+ decoded: dict[str, DecodeResult] = {}
+ for x, y, tube_id in detected:
+ scan_col = min(range(RACK_ROWS), key=lambda index: abs(xs[index] - x))
+ scan_row = min(range(RACK_COLS), key=lambda index: abs(ys[index] - y))
+ if abs(xs[scan_col] - x) > x_pitch * 0.45 or abs(ys[scan_row] - y) > y_pitch * 0.45:
+ continue
+ decoded[rack_position(scan_row, scan_col)] = DecodeResult(tube_id=tube_id, method="full-image")
+
+ for scan_row in range(RACK_COLS):
+ for scan_col in range(RACK_ROWS):
+ position = rack_position(scan_row, scan_col)
+ if position in decoded:
+ continue
+ crop_result = decode_well_crop(
+ image,
+ xs[scan_col],
+ ys[scan_row],
+ cv2,
+ np,
+ zxingcpp,
+ Image,
+ ImageOps,
+ )
+ if crop_result:
+ decoded[position] = crop_result
+
+ duplicate_ids = find_duplicate_ids(decoded)
+ if duplicate_ids:
+ raise MicronicError(
+ f"Duplicate tube IDs decoded from more than one well: {', '.join(duplicate_ids)}"
+ )
+
+ metadata = {
+ "imageSize": image.size,
+ "fullImageDecoded": len(detected),
+ "gridX": [round(value, 1) for value in xs],
+ "gridY": [round(value, 1) for value in ys],
+ "decodedWells": len(decoded),
+ "missing": [position for position in iter_positions() if position not in decoded],
+ }
+ return decoded, metadata
+
+
+def import_decode_dependencies():
+ try:
+ import cv2 # type: ignore
+ import numpy as np # type: ignore
+ import zxingcpp # type: ignore
+ from PIL import Image, ImageOps # type: ignore
+ except ImportError as exc:
+ raise MicronicError(
+ "Micronic decode dependencies are missing. Install pillow, "
+ "opencv-python-headless, numpy, and zxing-cpp."
+ ) from exc
+ return cv2, np, zxingcpp, Image, ImageOps
+
+
+def cluster_axis(values: list[float], expected_count: int, tolerance: float) -> list[float]:
+ if not values:
+ raise MicronicError("No decoded barcode positions are available for grid calibration.")
+
+ clusters: list[list[float]] = []
+ for value in sorted(values):
+ if not clusters:
+ clusters.append([value])
+ continue
+ mean = sum(clusters[-1]) / len(clusters[-1])
+ if abs(value - mean) > tolerance:
+ clusters.append([value])
+ else:
+ clusters[-1].append(value)
+
+ means = [sum(cluster) / len(cluster) for cluster in clusters]
+ if len(means) == expected_count:
+ return means
+ if len(means) >= 2:
+ return [
+ means[0] + index * (means[-1] - means[0]) / (expected_count - 1)
+ for index in range(expected_count)
+ ]
+ raise MicronicError(
+ f"Could not fit {expected_count} grid clusters from {len(values)} decoded positions."
+ )
+
+
+def fitted_axis(means: list[float], expected_count: int) -> list[float]:
+ return [
+ means[0] + index * (means[-1] - means[0]) / (expected_count - 1)
+ for index in range(expected_count)
+ ]
+
+
+def rack_position(scan_row: int, scan_col: int) -> str:
+ return f"{ROWS[RACK_ROWS - 1 - scan_col]}{RACK_COLS - scan_row:02d}"
+
+
+def iter_positions() -> Iterable[str]:
+ for row in ROWS:
+ for column in range(1, COLS + 1):
+ yield f"{row}{column:02d}"
+
+
+def is_tube_id(value: object) -> bool:
+ return isinstance(value, str) and value.isdigit() and len(value) == 10
+
+
+def decode_well_crop(
+ image, center_x, center_y, cv2, np, zxingcpp, Image, ImageOps
+) -> Optional[DecodeResult]:
+ for size in [150, 160, 180, 200, 220, 240]:
+ crop = centered_crop(image, center_x, center_y, size)
+ decoded = decode_pil_variants(crop, zxingcpp, ImageOps)
+ if decoded:
+ return DecodeResult(tube_id=decoded, method=f"crop-{size}")
+
+ for size in [100, 120, 140, 160]:
+ crop = centered_crop(image, center_x, center_y, size)
+ decoded = decode_perspective_crop(crop, cv2, np, zxingcpp, Image, ImageOps)
+ if decoded:
+ return DecodeResult(tube_id=decoded, method=f"perspective-{size}")
+
+ return None
+
+
+def centered_crop(image, center_x: float, center_y: float, size: int):
+ half = size / 2
+ return image.crop(
+ (
+ int(round(center_x - half)),
+ int(round(center_y - half)),
+ int(round(center_x + half)),
+ int(round(center_y + half)),
+ )
+ )
+
+
+def decode_pil_variants(crop, zxingcpp, ImageOps) -> Optional[str]:
+ for variant in [crop, ImageOps.autocontrast(crop), ImageOps.equalize(crop)]:
+ decoded = decode_with_zxing(variant, zxingcpp, ImageOps)
+ if decoded:
+ return decoded
+ return None
+
+
+def decode_with_zxing(image, zxingcpp, ImageOps) -> Optional[str]:
+ binarizers = [
+ zxingcpp.Binarizer.LocalAverage,
+ zxingcpp.Binarizer.GlobalHistogram,
+ zxingcpp.Binarizer.FixedThreshold,
+ ]
+ for scale in [1, 2, 3, 4]:
+ scaled = image if scale == 1 else image.resize((image.width * scale, image.height * scale))
+ for invert in [False, True]:
+ candidate = ImageOps.invert(scaled) if invert else scaled
+ for border in [0, 20, 50]:
+ padded = ImageOps.expand(candidate, border=border, fill=255) if border else candidate
+ for binarizer in binarizers:
+ for pure in [False, True]:
+ results = zxingcpp.read_barcodes(
+ padded,
+ formats=zxingcpp.BarcodeFormat.DataMatrix,
+ try_rotate=True,
+ try_downscale=False,
+ try_invert=True,
+ binarizer=binarizer,
+ is_pure=pure,
+ )
+ for result in results:
+ if is_tube_id(result.text):
+ return str(result.text)
+ return None
+
+
+def order_box(points, np):
+ points = np.array(points, dtype=np.float32)
+ sums = points.sum(axis=1)
+ diffs = np.diff(points, axis=1).ravel()
+ return np.array(
+ [
+ points[np.argmin(sums)],
+ points[np.argmin(diffs)],
+ points[np.argmax(sums)],
+ points[np.argmax(diffs)],
+ ],
+ dtype=np.float32,
+ )
+
+
+def decode_perspective_crop(crop, cv2, np, zxingcpp, Image, ImageOps) -> Optional[str]:
+ crop_array = np.array(crop)
+ for threshold in [30, 40, 50, 60, 70, 80, 90, 100, 120, 140]:
+ mask = (crop_array < threshold).astype(np.uint8) * 255
+ for candidate_mask in candidate_masks(mask, cv2, np):
+ if not candidate_mask.any():
+ continue
+ points = np.column_stack(np.where(candidate_mask > 0))[:, ::-1].astype(np.float32)
+ if len(points) < 40:
+ continue
+ rect = cv2.minAreaRect(points)
+ (rect_x, rect_y), (rect_w, rect_h), _angle = rect
+ if rect_w < 25 or rect_h < 25 or rect_w > crop.width * 0.9 or rect_h > crop.height * 0.9:
+ continue
+ if max(rect_w, rect_h) / max(1, min(rect_w, rect_h)) > 2:
+ continue
+
+ box = cv2.boxPoints(rect)
+ center = np.array([rect_x, rect_y], dtype=np.float32)
+ for margin in [0.9, 1.0, 1.1, 1.2, 1.35]:
+ source = order_box((box - center) * margin + center, np)
+ for output_size in [60, 80, 100, 120, 160]:
+ destination = np.array(
+ [
+ [0, 0],
+ [output_size - 1, 0],
+ [output_size - 1, output_size - 1],
+ [0, output_size - 1],
+ ],
+ dtype=np.float32,
+ )
+ matrix = cv2.getPerspectiveTransform(source, destination)
+ warped = cv2.warpPerspective(
+ crop_array, matrix, (output_size, output_size), borderValue=255
+ )
+ for mode_array in perspective_variants(warped, threshold, cv2, Image, ImageOps):
+ decoded = decode_with_zxing(mode_array, zxingcpp, ImageOps)
+ if decoded:
+ return decoded
+ return None
+
+
+def candidate_masks(mask, cv2, np):
+ yield mask
+ number, labels, stats, centroids = cv2.connectedComponentsWithStats(mask, 8)
+ combined = np.zeros_like(mask)
+ size = mask.shape[0]
+ for index in range(1, number):
+ _x, _y, width, height, area = stats[index]
+ center_x, center_y = centroids[index]
+ if area < 15 or width < 8 or height < 8:
+ continue
+ if abs(center_x - size / 2) > size * 0.33 or abs(center_y - size / 2) > size * 0.33:
+ continue
+ if width > size * 0.85 or height > size * 0.85:
+ continue
+ combined[labels == index] = 255
+ yield combined
+
+
+def perspective_variants(warped, threshold: int, cv2, Image, ImageOps):
+ yield Image.fromarray(warped)
+ yield ImageOps.autocontrast(Image.fromarray(warped))
+ _, binary = cv2.threshold(warped, min(220, threshold + 70), 255, cv2.THRESH_BINARY)
+ yield Image.fromarray(binary)
+ yield Image.fromarray(255 - binary)
+
+
+def find_duplicate_ids(decoded: dict[str, DecodeResult]) -> list[str]:
+ seen: dict[str, str] = {}
+ duplicates: list[str] = []
+ for position, result in decoded.items():
+ previous = seen.get(result.tube_id)
+ if previous and previous != position:
+ duplicates.append(result.tube_id)
+ seen[result.tube_id] = position
+ return sorted(set(duplicates))
diff --git a/pylabrobot/micronic/code_reader/micronic_tests.py b/pylabrobot/micronic/code_reader/micronic_tests.py
new file mode 100644
index 00000000000..2b63a3f3eb4
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/micronic_tests.py
@@ -0,0 +1,346 @@
+import asyncio
+import sys
+import tempfile
+import unittest
+from pathlib import Path
+from typing import cast
+from unittest.mock import MagicMock, patch
+
+from pylabrobot.capabilities.rack_reading import RackReaderState
+from pylabrobot.micronic import MicronicCodeReader
+from pylabrobot.micronic.code_reader.driver import (
+ DecodeResult,
+ MicronicDriver,
+ MicronicError,
+ choose_image_extension,
+ read_rack_id,
+ read_rack_id_plr_serial,
+ run_scan,
+)
+from pylabrobot.micronic.code_reader.rack_reading_backend import MicronicRackReadingBackend
+
+
+async def wait_for_dataready(driver: MicronicDriver) -> None:
+ for _ in range(100):
+ if await driver.get_rack_reader_state() == RackReaderState.DATAREADY:
+ return
+ await asyncio.sleep(0.01)
+ raise AssertionError("Micronic test scan did not reach dataready.")
+
+
+class TestMicronicDriver(unittest.IsolatedAsyncioTestCase):
+ def test_driver_does_not_default_to_packaged_twain_helper(self):
+ driver = MicronicDriver()
+ self.assertIsNone(driver.twain_scanner_path)
+ self.assertIsNone(driver.scan_command)
+
+ async def test_driver_scan_populates_standard_rack_result(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ driver = MicronicDriver(
+ image_dir=image_dir,
+ min_wells=2,
+ keep_images=True,
+ )
+ decoded = {
+ "A01": DecodeResult(tube_id="1111111111", method="test"),
+ "A02": DecodeResult(tube_id="2222222222", method="test"),
+ }
+ with (
+ patch(
+ "pylabrobot.micronic.code_reader.driver.run_scan",
+ return_value={"source": "test"},
+ ) as run_scan_mock,
+ patch(
+ "pylabrobot.micronic.code_reader.driver.read_rack_id",
+ return_value="9500017722",
+ ) as read_rack_id_mock,
+ patch(
+ "pylabrobot.micronic.code_reader.driver.decode_image",
+ return_value=(decoded, {"decodedWells": 2}),
+ ) as decode_image_mock,
+ ):
+ await driver.setup()
+ await driver.trigger_rack_scan()
+ await wait_for_dataready(driver)
+ result = await driver.get_scan_result()
+
+ self.assertEqual(await driver.get_rack_reader_state(), RackReaderState.DATAREADY)
+ self.assertEqual(result.rack_id, "9500017722")
+ self.assertEqual(result.entries[0].position, "A01")
+ self.assertEqual(result.entries[0].tube_id, "1111111111")
+ self.assertEqual(result.entries[1].tube_id, "2222222222")
+ self.assertEqual(driver.last_scan_metadata, {"source": "test"})
+ self.assertEqual(driver.last_decode_metadata, {"decodedWells": 2})
+ run_scan_mock.assert_called_once()
+ read_rack_id_mock.assert_called_once()
+ decode_image_mock.assert_called_once()
+
+ async def test_reader_can_scan_twice_after_dataready(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ reader = MicronicCodeReader(
+ driver=MicronicDriver(
+ image_dir=image_dir,
+ min_wells=1,
+ keep_images=True,
+ )
+ )
+ decoded = {"A01": DecodeResult(tube_id="1111111111", method="test")}
+ with (
+ patch(
+ "pylabrobot.micronic.code_reader.driver.run_scan",
+ return_value={"source": "test"},
+ ) as run_scan_mock,
+ patch(
+ "pylabrobot.micronic.code_reader.driver.read_rack_id",
+ return_value="9500017722",
+ ),
+ patch(
+ "pylabrobot.micronic.code_reader.driver.decode_image",
+ return_value=(decoded, {"decodedWells": 1}),
+ ),
+ ):
+ await reader.setup()
+ first = await reader.rack_reading.scan_rack(timeout=1.0, poll_interval=0.01)
+ second = await reader.rack_reading.scan_rack(timeout=1.0, poll_interval=0.01)
+
+ self.assertEqual(first.rack_id, "9500017722")
+ self.assertEqual(second.rack_id, "9500017722")
+ self.assertEqual(run_scan_mock.call_count, 2)
+
+ async def test_driver_get_rack_id_does_not_return_stale_result_while_scanning(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ driver = MicronicDriver(
+ image_dir=image_dir,
+ min_wells=1,
+ keep_images=True,
+ )
+ decoded = {"A01": DecodeResult(tube_id="1111111111", method="test")}
+
+ def slow_scan(*args, **kwargs):
+ del args, kwargs
+ import time
+
+ time.sleep(0.05)
+ return {"source": "test"}
+
+ with (
+ patch(
+ "pylabrobot.micronic.code_reader.driver.run_scan",
+ return_value={"source": "test"},
+ ),
+ patch(
+ "pylabrobot.micronic.code_reader.driver.read_rack_id",
+ return_value="9500017722",
+ ),
+ patch(
+ "pylabrobot.micronic.code_reader.driver.decode_image",
+ return_value=(decoded, {"decodedWells": 1}),
+ ),
+ ):
+ await driver.setup()
+ await driver.trigger_rack_scan()
+ await wait_for_dataready(driver)
+ self.assertEqual(await driver.get_rack_id(), "9500017722")
+
+ with (
+ patch(
+ "pylabrobot.micronic.code_reader.driver.run_scan",
+ side_effect=slow_scan,
+ ),
+ patch(
+ "pylabrobot.micronic.code_reader.driver.read_rack_id",
+ return_value="9500017723",
+ ),
+ patch(
+ "pylabrobot.micronic.code_reader.driver.decode_image",
+ return_value=(decoded, {"decodedWells": 1}),
+ ),
+ ):
+ await driver.trigger_rack_scan()
+ with self.assertRaises(MicronicError):
+ await driver.get_rack_id()
+ await wait_for_dataready(driver)
+ self.assertEqual(await driver.get_rack_id(), "9500017723")
+
+ async def test_run_scan_uses_explicit_command(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ output_path = Path(image_dir) / "rack.bmp"
+ metadata = run_scan(
+ output_path=output_path,
+ timeout_ms=1000,
+ scan_command=[
+ sys.executable,
+ "-c",
+ "from pathlib import Path; Path(r'{output_path}').write_bytes(b'image')",
+ ],
+ )
+
+ self.assertEqual(metadata["source"], "command")
+ self.assertTrue(output_path.exists())
+
+ async def test_run_scan_uses_sane_scanimage_when_requested(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ output_path = Path(image_dir) / "micronic-test.tiff"
+ with (
+ patch(
+ "pylabrobot.micronic.code_reader.driver.shutil.which",
+ return_value="/usr/bin/scanimage",
+ ),
+ patch(
+ "pylabrobot.micronic.code_reader.driver.run_scan_command",
+ return_value={"source": "sane"},
+ ) as run_scan_command,
+ ):
+ metadata = run_scan(
+ output_path=output_path,
+ timeout_ms=1000,
+ scanner_backend="sane",
+ sane_device="avision:libusb:001:004",
+ )
+
+ self.assertEqual(metadata["source"], "sane")
+ run_scan_command.assert_called_once_with(
+ [
+ "/usr/bin/scanimage",
+ "--device-name",
+ "avision:libusb:001:004",
+ "--format=tiff",
+ "--output-file",
+ str(output_path),
+ ],
+ output_path,
+ 1000,
+ source="sane",
+ )
+
+ async def test_run_scan_requires_configured_acquisition(self):
+ with tempfile.TemporaryDirectory() as image_dir:
+ with (
+ patch("pylabrobot.micronic.code_reader.driver.shutil.which", return_value=None),
+ self.assertRaises(MicronicError),
+ ):
+ run_scan(
+ output_path=Path(image_dir) / "micronic-test.bmp",
+ timeout_ms=1000,
+ scanner_backend="twain",
+ )
+
+ async def test_choose_image_extension_prefers_sane_tiff_on_non_windows_auto(self):
+ extension = choose_image_extension(
+ image_extension=None,
+ image_input=None,
+ scanner_backend="auto",
+ scan_command=None,
+ twain_scanner_path=None,
+ sane_device="avision:libusb:001:004",
+ )
+ self.assertEqual(extension, "tiff")
+
+ async def test_read_rack_id_uses_configured_command(self):
+ rack_id = read_rack_id(
+ timeout_ms=1000,
+ rack_id_command=[sys.executable, "-c", "print('rack 9500017722')"],
+ )
+ self.assertEqual(rack_id, "9500017722")
+
+ async def test_read_rack_id_uses_plr_serial(self):
+ instances: list[object] = []
+
+ class FakeSerial:
+ def __init__(self, **kwargs):
+ self.kwargs = kwargs
+ self.reads = iter([b"9", b"5", b"0", b"0", b"0", b"1", b"7", b"7", b"2", b"2", b"\r"])
+ self.calls: list[str] = []
+ instances.append(self)
+
+ async def setup(self):
+ self.calls.append("setup")
+
+ async def reset_input_buffer(self):
+ self.calls.append("reset_input_buffer")
+
+ async def write(self, data: bytes):
+ self.calls.append(f"write:{data!r}")
+
+ async def read(self, num_bytes: int = 1) -> bytes:
+ self.calls.append(f"read:{num_bytes}")
+ return next(self.reads)
+
+ async def stop(self):
+ self.calls.append("stop")
+
+ with patch("pylabrobot.micronic.code_reader.driver.Serial", FakeSerial):
+ rack_id = await read_rack_id_plr_serial(serial_port="COM4", timeout_ms=1000)
+
+ self.assertEqual(len(instances), 1)
+ fake_serial = cast(FakeSerial, instances[0])
+ self.assertEqual(rack_id, "9500017722")
+ self.assertEqual(fake_serial.kwargs["port"], "COM4")
+ self.assertEqual(fake_serial.kwargs["bytesize"], 7)
+ self.assertEqual(fake_serial.kwargs["parity"], "E")
+ self.assertIn("reset_input_buffer", fake_serial.calls)
+ self.assertIn("write:b'\\r\\n'", fake_serial.calls)
+ self.assertEqual(fake_serial.calls[-1], "stop")
+
+ async def test_driver_scan_rack_id_uses_configured_command(self):
+ driver = MicronicDriver(rack_id_command=[sys.executable, "-c", "print('rack 9500017722')"])
+ self.assertEqual(await driver.scan_rack_id(timeout=1.0, poll_interval=0.1), "9500017722")
+
+ async def test_driver_raises_when_scan_result_is_not_ready(self):
+ driver = MicronicDriver()
+ with self.assertRaises(MicronicError):
+ await driver.get_scan_result()
+
+ async def test_driver_rejects_unknown_layout(self):
+ driver = MicronicDriver()
+ with self.assertRaises(MicronicError):
+ await driver.set_current_layout("384")
+
+ async def test_backend_delegates_to_driver(self):
+ driver = MicronicDriver(rack_id_override="9500017722")
+ backend = MicronicRackReadingBackend(driver=driver)
+ with patch.object(driver, "scan_rack_id", return_value="9500017722") as scan_rack_id:
+ rack_id = await backend.scan_rack_id(timeout=5.0, poll_interval=0.5)
+ self.assertEqual(rack_id, "9500017722")
+ scan_rack_id.assert_called_once_with(timeout=5.0, poll_interval=0.5)
+
+
+class TestMicronicCodeReader(unittest.IsolatedAsyncioTestCase):
+ async def test_device_exposes_rack_reading_only(self):
+ reader = MicronicCodeReader(
+ timeout=12.0,
+ poll_interval=0.25,
+ driver=MicronicDriver(rack_id_override="9500017722"),
+ )
+ with patch.object(
+ reader.driver,
+ "get_rack_reader_state",
+ return_value=RackReaderState.IDLE,
+ ):
+ await reader.setup()
+ try:
+ self.assertIn(reader.rack_reading, reader._capabilities)
+ self.assertFalse(hasattr(reader, "barcode_scanning"))
+ with patch.object(
+ reader.rack_reading,
+ "scan_rack",
+ return_value=MagicMock(rack_id="9500017722"),
+ ) as scan_rack:
+ result = await reader.rack_reading.scan_rack(
+ timeout=reader.default_timeout,
+ poll_interval=reader.default_poll_interval,
+ )
+ finally:
+ await reader.stop()
+
+ self.assertEqual(result.rack_id, "9500017722")
+ scan_rack.assert_called_once_with(timeout=12.0, poll_interval=0.25)
+
+ async def test_frontend_uses_driver(self):
+ reader = MicronicCodeReader(rack_id_override="9500017722")
+ self.assertIsInstance(reader.driver, MicronicDriver)
+ self.assertFalse(hasattr(reader, "barcode_scanning"))
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/pylabrobot/micronic/code_reader/rack_reading_backend.py b/pylabrobot/micronic/code_reader/rack_reading_backend.py
new file mode 100644
index 00000000000..b376fe83fb6
--- /dev/null
+++ b/pylabrobot/micronic/code_reader/rack_reading_backend.py
@@ -0,0 +1,79 @@
+"""Rack-reading backend for the Micronic driver."""
+
+from __future__ import annotations
+
+from typing import Optional
+
+from pylabrobot.capabilities.capability import BackendParams
+from pylabrobot.capabilities.rack_reading import (
+ LayoutInfo,
+ RackReaderBackend,
+ RackReaderError,
+ RackReaderState,
+ RackScanResult,
+)
+
+from .driver import MicronicDriver, MicronicError
+
+
+class MicronicRackReaderError(MicronicError, RackReaderError):
+ """Raised when Micronic rack-reading operations fail."""
+
+
+class MicronicRackReadingBackend(RackReaderBackend):
+ """Rack-reading backend that delegates to the Micronic driver."""
+
+ def __init__(self, driver: MicronicDriver):
+ super().__init__()
+ self.driver = driver
+
+ async def _on_setup(self, backend_params: Optional[BackendParams] = None):
+ await self.get_state()
+
+ async def get_state(self) -> RackReaderState:
+ try:
+ return await self.driver.get_rack_reader_state()
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc
+
+ async def trigger_rack_scan(self) -> None:
+ try:
+ await self.driver.trigger_rack_scan()
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc
+
+ async def scan_rack_id(self, timeout: float, poll_interval: float) -> str:
+ try:
+ return await self.driver.scan_rack_id(timeout=timeout, poll_interval=poll_interval)
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc
+
+ async def get_scan_result(self) -> RackScanResult:
+ try:
+ return await self.driver.get_scan_result()
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc
+
+ async def get_rack_id(self) -> str:
+ try:
+ return await self.driver.get_rack_id()
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc
+
+ async def get_layouts(self) -> list[LayoutInfo]:
+ try:
+ return await self.driver.get_layouts()
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc
+
+ async def get_current_layout(self) -> str:
+ try:
+ return await self.driver.get_current_layout()
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc
+
+ async def set_current_layout(self, layout: str) -> None:
+ try:
+ await self.driver.set_current_layout(layout)
+ except MicronicError as exc:
+ raise MicronicRackReaderError(str(exc)) from exc