diff --git a/.gitignore b/.gitignore index 10d2b64ea58..bc615036d54 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,10 @@ pyhamilton/LAY-BACKUP .ipynb_checkpoints *.egg-info *.log +keyser-testing/Tecan Manuals/ +keyser-testing/evoware-usb-capture-*/ +keyser-testing/evoware-pktmon-capture-*/ +keyser-testing/*.pdf build/lib myenv diff --git a/_typos.toml b/_typos.toml index 742efae9454..3cf64e87e10 100644 --- a/_typos.toml +++ b/_typos.toml @@ -28,8 +28,21 @@ mis = "mis" RHE = "RHE" "ASEND" = "ASEND" caf = "caf" +# Tecan firmware command abbreviations +ALO = "ALO" +SOM = "SOM" +SHS = "SHS" +SHW = "SHW" +AZMA = "AZMA" +MCH = "MCH" +AAS = "AAS" +AER = "AER" [files] extend-exclude = [ - "*.ipynb" + "*.ipynb", + "keyser-testing/Tecan Manuals/", + "keyser-testing/evoware-usb-capture-aspirate-dispense/", + "keyser-testing/evoware-usb-capture-multidispense/", + "keyser-testing/evoware-pktmon-capture-20260327/", ] diff --git a/claude.md b/claude.md index dea76713f79..60c7fe0d49f 100644 --- a/claude.md +++ b/claude.md @@ -1 +1,104 @@ -pretend you are Jeff Dean +# PyLabRobot - Lab Automation Integration + +## Project Overview +PyLabRobot is a hardware-agnostic Python library for lab automation. We are integrating several instruments for automated liquid handling, bulk dispensing, and robotic plate movement. + +## Our Lab Equipment + +### Tecan EVO 150 (Liquid Handler) +- **Backend**: `EVOBackend` from `pylabrobot.liquid_handling.backends.tecan` +- **Frontend**: `LiquidHandler` from `pylabrobot.liquid_handling` +- **Deck**: `EVO150Deck` (45 rails, 1315 x 780 x 765 mm) +- **Connection**: USB (VID=0x0C47, PID=0x4000) +- **Install**: `pip install -e ".[usb]"` + +#### Default Deck Components +- **Plate carrier**: `MP_3Pos` (Tecan part no. 10612604) - 3-position microplate carrier + - Import: `from pylabrobot.resources.tecan.plate_carriers import MP_3Pos` +- **Tip carrier**: `DiTi_3Pos` (Tecan part no. 10613022) - 3-position DiTi carrier + - Import: `from pylabrobot.resources.tecan.tip_carriers import DiTi_3Pos` +- **Tips**: `DiTi_50ul_SBS_LiHa` - 50uL disposable tips for LiHa + - Import: `from pylabrobot.resources.tecan.tip_racks import DiTi_50ul_SBS_LiHa` +- **Plates**: `Eppendorf_96_wellplate_250ul_Vb` - Eppendorf twin.tec 96-well (250uL, V-bottom) + - Import: `from pylabrobot.resources.eppendorf.plates import Eppendorf_96_wellplate_250ul_Vb` + +### Thermo Scientific Multidrop Combi (Bulk Dispenser) +- **Backend**: `MultidropCombiBackend` from `pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi` +- **Frontend**: `BulkDispenser` from `pylabrobot.bulk_dispensers` +- **Connection**: RS232 via USB adapter (specify COM port explicitly) +- **Serial config**: 9600 baud, 8N1, XON/XOFF +- **Install**: `pip install -e ".[serial]"` +- **Plate helpers**: `plate_to_type_index()`, `plate_to_pla_params()` for PLR plate → Multidrop mapping +- **Protocol docs**: `C:\Users\keyser\source\repos\keyser-sila-testing\documentation\Multidrop Combi Remote Control Command Sets (1).pdf` + +### UFACTORY xArm 6 (Robotic Arm) +- **Backend**: `XArm6Backend` from `pylabrobot.arms.xarm6.xarm6_backend` +- **Frontend**: `SixAxisArm` from `pylabrobot.arms.six_axis` +- **Connection**: Ethernet (IP address) +- **Install**: `pip install xarm-python-sdk` + +## Architecture Patterns + +### Legacy Architecture (main branch) +Each device category follows this pattern: +- **Frontend class** (`Machine` subclass) - thin delegation layer with `@need_setup_finished` guards +- **Abstract backend** (`MachineBackend` subclass with `ABCMeta`) - defines the device-type interface +- **Concrete backend** - implements the abstract backend for a specific instrument +- **Chatterbox backend** - prints operations for testing without hardware + +Key base classes: +- `pylabrobot/machines/backend.py` - `MachineBackend(SerializableMixin, ABC)` +- `pylabrobot/machines/machine.py` - `Machine(SerializableMixin, ABC)` + `need_setup_finished` decorator + +### v1b1 Architecture (v1b1 branch) +New capability-based architecture replacing the monolithic backend model: +- **Driver** (`pylabrobot/device.py`) - owns I/O, connection lifecycle (`setup()/stop()`) +- **CapabilityBackend** (`pylabrobot/capabilities/capability.py`) - protocol translation for one concern +- **Capability** - user-facing API with validation, tip tracking, etc. +- **Device** - owns Driver + list of Capabilities, orchestrates lifecycle +- **BackendParams** - typed dataclasses replacing `**kwargs` + +Key interfaces for Tecan EVO migration: +- `PIPBackend` (`pylabrobot/capabilities/liquid_handling/pip_backend.py`) - independent channel pipetting +- `GripperArmBackend` (`pylabrobot/arms/backend.py`) - plate handling arms +- Reference implementation: Hamilton STAR at `pylabrobot/hamilton/liquid_handlers/star/` + +## Branch Strategy + +| Branch | Base | Purpose | +|--------|------|---------| +| `air-liha-backend` | `main` | Legacy Air LiHa backend (WIP, may PR to main) | +| `v1b1-tecan-evo` | `origin/v1b1` | Native v1b1 EVO backend (syringe + Air LiHa + RoMa) | +| `keyser-combined` | `main` | Combined xArm + Multidrop for testing | +| `keyser-multidrop-testing` | `main` | Multidrop Combi backend | +| `keyser-xarm-testing` | `main` | xArm 6 backend | + +### v1b1 Tecan EVO File Structure +``` +pylabrobot/tecan/evo/ + driver.py # TecanEVODriver(Driver) — USB I/O + command protocol + pip_backend.py # EVOPIPBackend(PIPBackend) — syringe LiHa + air_pip_backend.py # AirEVOPIPBackend(EVOPIPBackend) — Air LiHa + roma_backend.py # EVORoMaBackend(GripperArmBackend) — RoMa plate handling + evo.py # TecanEVO(Resource, Device) — composite device + params.py # BackendParams dataclasses + errors.py # TecanError + firmware/ # Extracted firmware command wrappers (LiHa, RoMa, EVOArm) +``` + +Legacy EVO stays at: `pylabrobot/legacy/liquid_handling/backends/tecan/` + +## Air LiHa (ZaapMotion) Key Facts +- ZaapMotion controllers boot into bootloader mode after power cycle +- Must send `T2{0-7}X` (exit boot) + 33 motor config commands per tip before PIA +- Plunger conversion: 106.4 steps/uL (vs 3 for syringe), 213 speed factor (vs 6) +- Force mode: `SFR133120`+`SFP1` before plunger ops, `SFR3752`+`SDP1400` after +- Investigation details: `keyser-testing/AirLiHa_Investigation.md` + +## Development +- Venv: `.venv/` in project root +- Install all deps: `pip install -e ".[serial,usb]"` +- Tests: `python -m pytest pylabrobot/bulk_dispensers/ -v` +- Lint: `ruff check`, Format: `ruff format`, Types: `mypy pylabrobot --check-untyped-defs` +- Abstract interfaces use microliters (float); backends convert to instrument-specific units +- Tecan Z coordinates: 0 = deck surface, z_range (~2100) = top/home diff --git a/pylabrobot/legacy/liquid_handling/liquid_classes/tecan.py b/pylabrobot/legacy/liquid_handling/liquid_classes/tecan.py index 3cdc0751b2a..f817d8d8d4f 100644 --- a/pylabrobot/legacy/liquid_handling/liquid_classes/tecan.py +++ b/pylabrobot/legacy/liquid_handling/liquid_classes/tecan.py @@ -2792,3 +2792,418 @@ def get_liquid_class( dispense_retract_speed=50, dispense_retract_offset=0, ) +mapping[(1.0, 10.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=4, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.09, + calibration_offset=-0.07, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=15.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=0.1, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=0.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO free dispense DiTi 50 [1.0-10.01 uL] + +mapping[(10.01, 50.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=4, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.04, + calibration_offset=0.36, + aspirate_speed=50.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=1.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=0.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO free dispense DiTi 50 [10.01-50.01 uL] + +mapping[(0.5, 1.0, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=2, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=0.5, + calibration_offset=-0.25, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=3.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=20.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=True, + dispense_lld_position=3, + dispense_lld_offset=1.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO wet contact DiTi 50 [0.5-1.0 uL] + +mapping[(1.0, 5.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=2, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.141, + calibration_offset=-0.345, + aspirate_speed=50.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=8.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=20.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=True, + dispense_lld_position=3, + dispense_lld_offset=1.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO wet contact DiTi 50 [1.0-5.01 uL] + +mapping[(5.01, 10.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=2, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.043, + calibration_offset=0.214, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=20.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=True, + dispense_lld_position=3, + dispense_lld_offset=1.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO wet contact DiTi 50 [5.01-10.01 uL] + +mapping[(0.5, 5.01, Liquid.WATER, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=1.0, + pmp_character=0, + density=1.0, + calibration_factor=1.1, + calibration_offset=0.44, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=20.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # Water free dispense DiTi 50 [0.5-5.01 uL] + +mapping[(5.01, 10.01, Liquid.WATER, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=1.0, + pmp_character=0, + density=1.0, + calibration_factor=1.085, + calibration_offset=0.65, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=20.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # Water free dispense DiTi 50 [5.01-10.01 uL] + +mapping[(10.01, 50.01, Liquid.WATER, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=1.0, + pmp_character=0, + density=1.0, + calibration_factor=1.052, + calibration_offset=0.387, + aspirate_speed=70.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=8.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=20.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # Water free dispense DiTi 50 [10.01-50.01 uL] diff --git a/pylabrobot/tecan/__init__.py b/pylabrobot/tecan/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/tecan/evo/__init__.py b/pylabrobot/tecan/evo/__init__.py new file mode 100644 index 00000000000..ef4ff36925d --- /dev/null +++ b/pylabrobot/tecan/evo/__init__.py @@ -0,0 +1,5 @@ +from pylabrobot.tecan.evo.air_pip_backend import AirEVOPIPBackend +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.evo import TecanEVO +from pylabrobot.tecan.evo.pip_backend import EVOPIPBackend +from pylabrobot.tecan.evo.roma_backend import EVORoMaBackend diff --git a/pylabrobot/tecan/evo/air_pip_backend.py b/pylabrobot/tecan/evo/air_pip_backend.py new file mode 100644 index 00000000000..b124ec51abd --- /dev/null +++ b/pylabrobot/tecan/evo/air_pip_backend.py @@ -0,0 +1,501 @@ +"""PIPBackend for the Tecan EVO with Air LiHa (ZaapMotion controllers). + +Overrides conversion factors and adds ZaapMotion boot exit, motor +configuration, and force mode wrapping around plunger operations. + +See keyser-testing/AirLiHa_Investigation.md for reverse-engineering details. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import List, Optional + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.liquid_handling.standard import ( + Aspiration, + Dispense, + Mix, + Pickup, + TipDrop, +) +from pylabrobot.resources import Resource, TecanTipRack + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm +from pylabrobot.tecan.evo.firmware.zaapmotion import ZaapMotion +from pylabrobot.tecan.evo.pip_backend import EVOPIPBackend + +logger = logging.getLogger(__name__) + +# ZaapMotion motor configuration sequence, sent via transparent pipeline T2x. +# Captured from EVOware USB traffic (zaapmotiondriver.dll scan phase). +ZAAPMOTION_CONFIG = [ + "CFE 255,500", + "CAD ADCA,0,12.5", + "CAD ADCB,1,12.5", + "EDF1", + "EDF4", + "CDO 11", + "EDF5", + "SIC 10,5", + "SEA ADD,H,4,STOP,1,0,0", + "CMTBLDC,1", + "CETQEP2,256,R", + "CECPOS,QEP2", + "CECCUR,QEP2", + "CEE OFF", + "STL80", + "SVL12,8,16", + "SVL24,20,28", + "SCL1,900,3.5", + "SCE HOLD,500", + "SCE MOVE,500", + "CIR0", + "PIDHOLD,D,1.2,1,-1,0.003,0,0,OFF", + "PIDMOVE,D,0.8,1,-1,0.004,0,0,OFF", + "PIDHOLD,Q,1.2,1,-1,0.003,0,0,OFF", + "PIDMOVE,Q,0.8,1,-1,0.004,0,0,OFF", + "PIDHOLD,POS,0.2,1,-1,0.02,4,0,OFF", + "PIDMOVE,POS,0.35,1,-1,0.1,3,0,OFF", + "PIDSPDELAY,0", + "SFF 0.045,0.4,0.041", + "SES 0", + "SPO0", + "SIA 0.01, 0.28, 0.0", + "WRP", +] + + +class AirEVOPIPBackend(EVOPIPBackend): + """PIPBackend for the Tecan EVO with Air LiHa (ZaapMotion controllers). + + Air displacement uses BLDC motor controllers instead of syringe dilutors. + After power cycle, the ZaapMotion controllers boot into bootloader mode + and require firmware exit + motor configuration before PIA can succeed. + + Conversion factors for ZaapMotion (air displacement): + - Volume: 106.4 full plunger steps per uL + - Speed: 213 half-steps/sec per uL/s + """ + + STEPS_PER_UL = 106.4 + SPEED_FACTOR = 213.0 + + # ZaapMotion force ramp values + SFR_ACTIVE = 133120 + SFR_IDLE = 3752 + SDP_DEFAULT = 1400 + + def __init__( + self, + driver: TecanEVODriver, + deck: Resource, + diti_count: int = 0, + ): + super().__init__(driver=driver, deck=deck, diti_count=diti_count) + self.zaap: Optional[ZaapMotion] = None + self._agt_z_start: Optional[int] = None # set during pickup, used to validate drop + + def _apply_calibration_offsets( + self, + x: int, + y: int, + ops: list, + ) -> tuple: + """Apply per-labware X/Y calibration offsets if defined.""" + par = ops[0].resource.parent + # Walk up to find the labware with offsets (plate or tip rack) + while par is not None: + if hasattr(par, "x_offset"): + x += getattr(par, "x_offset", 0) + y += getattr(par, "y_offset", 0) + break + par = getattr(par, "parent", None) + return x, y + + async def _on_setup(self) -> None: + """Configure ZaapMotion controllers, then run standard LiHa init.""" + + # Check if already initialized (skip ZaapMotion config + PIA) + if await self._is_initialized(): + logger.info("Axes already initialized — skipping ZaapMotion config + PIA.") + await self._setup_quick() + return + + logger.info("Running full Air LiHa setup...") + await self._configure_zaapmotion() + await self._setup_safety_module() + + # ZaapMotion SDO config (from EVOware: sent right before PIA) + assert self.zaap is not None + try: + await self.zaap.set_sdo("11,1") + except TecanError: + pass + + # Standard LiHa init (PIA, plunger init, etc.) + await super()._on_setup() + + async def _is_initialized(self) -> bool: + """Check if LiHa axes are already initialized.""" + try: + arm = EVOArm(self._driver, "C5") # type: ignore[arg-type] + err = await arm.read_error_register(0) + err = str(err) # may be int if all digits + # A = init failed (1), G = not initialized (7) + if err and not any(c in ("A", "G") for c in err): + return True + except (TecanError, TimeoutError): + pass + return False + + async def _setup_quick(self) -> None: + """Fast setup when axes are already initialized.""" + from pylabrobot.tecan.evo.firmware import LiHa + + self.liha = LiHa(self._driver, "C5") # type: ignore[arg-type] + self.zaap = ZaapMotion(self._driver) # type: ignore[arg-type] + self._num_channels = await self.liha.report_number_tips() + self._x_range = await self.liha.report_x_param(5) + self._y_range = (await self.liha.report_y_param(5))[0] + self._z_range = (await self.liha.report_z_param(5))[0] + logger.info("Quick setup complete: %d channels, z_range=%d", self._num_channels, self._z_range) + + async def _configure_zaapmotion(self) -> None: + """Exit boot mode and configure all 8 ZaapMotion motor controllers.""" + zaap = ZaapMotion(self._driver) # type: ignore[arg-type] + all_failed_tips = [] + for tip in range(8): + # Check current mode + try: + firmware = await zaap.read_firmware_version(tip) + except TecanError: + firmware = "" + + if "BOOT" in str(firmware): + logger.info("ZaapMotion tip %d in boot mode, sending exit command", tip + 1) + await zaap.exit_boot_mode(tip) + await asyncio.sleep(1) + + # Verify transition + try: + firmware = await zaap.read_firmware_version(tip) + except TecanError: + firmware = "" + + if "BOOT" in str(firmware): + raise TecanError(f"ZaapMotion tip {tip + 1} failed to exit boot mode", "C5", 1) + + # Check if already configured + try: + await zaap.read_config_status(tip) + logger.info("ZaapMotion tip %d already configured, skipping", tip + 1) + continue + except TecanError: + pass + + # Send motor configuration + logger.info("Configuring ZaapMotion tip %d (%d commands)", tip + 1, len(ZAAPMOTION_CONFIG)) + failures = 0 + for cmd in ZAAPMOTION_CONFIG: + try: + await zaap.configure_motor(tip, cmd) + except TecanError as e: + failures += 1 + logger.warning("ZaapMotion tip %d config '%s' failed: %s", tip + 1, cmd, e) + + if failures == len(ZAAPMOTION_CONFIG): + all_failed_tips.append(tip + 1) + + if all_failed_tips: + raise TecanError( + f"ZaapMotion controllers not responding (tips {all_failed_tips}). " + "Power cycle the EVO and try again.", + "C5", + 5, + ) + + self.zaap = zaap + + async def _setup_safety_module(self) -> None: + """Send safety module commands to enable motor power.""" + try: + await self._driver.send_command("O1", command="SPN") + await self._driver.send_command("O1", command="SPS3") + except TecanError as e: + logger.warning("Safety module command failed: %s", e) + + # ============== ZaapMotion force mode ============== + + async def _zaapmotion_force_on(self) -> None: + """Enable ZaapMotion force mode before plunger operations.""" + assert self.zaap is not None + for tip in range(8): + await self.zaap.set_force_ramp(tip, self.SFR_ACTIVE) + for tip in range(8): + await self.zaap.set_force_mode(tip) + + async def _zaapmotion_force_off(self) -> None: + """Restore ZaapMotion to idle after plunger operations.""" + assert self.zaap is not None + for tip in range(8): + await self.zaap.set_force_ramp(tip, self.SFR_IDLE) + for tip in range(8): + await self.zaap.set_default_position(tip, self.SDP_DEFAULT) + + # ============== Force-mode overrides for mixing and blow-out ============== + + async def _perform_mix(self, mix: Mix, use_channels: List[int]) -> None: + await self._zaapmotion_force_on() + await super()._perform_mix(mix, use_channels) + await self._zaapmotion_force_off() + + async def _perform_blow_out(self, ops: List[Dispense], use_channels: List[int]) -> None: + await self._zaapmotion_force_on() + await super()._perform_blow_out(ops, use_channels) + await self._zaapmotion_force_off() + + # ============== Override operations with force mode ============== + + async def pick_up_tips( + self, + ops: List[Pickup], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + # Use _liha_positions for X/Y only; Z comes from tip rack directly + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + x, y_adj = self._apply_calibration_offsets(x, y - yi * ys, ops) + logger.info("pick_up_tips: X=%d Y=%d ys=%d (taught tip top: X=3893 Y=146)", x, y_adj, ys) + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis(x, y_adj, ys, [self._z_range] * self.num_channels) + + # Aspirate small air gap with force mode + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + for channel in use_channels: + pvl[channel] = 0 + sep[channel] = int(70 * self.SPEED_FACTOR) + ppr[channel] = int(10 * self.STEPS_PER_UL) + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + # AGT using tip rack z_start directly + par = ops[0].resource.parent + assert isinstance(par, TecanTipRack), f"Expected TecanTipRack, got {type(par)}" + agt_z_start = int(par.z_start) + agt_z_search = abs(int(par.z_max - par.z_start)) + logger.info("pick_up_tips AGT: z_start=%d z_search=%d", agt_z_start, agt_z_search) + await self.liha.get_disposable_tip( + self._bin_use_channels(use_channels), agt_z_start, agt_z_search + ) + self._agt_z_start = agt_z_start + + async def drop_tips( + self, + ops: List[TipDrop], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + x, y_adj = self._apply_calibration_offsets(x, y - yi * ys, ops) + logger.info("drop_tips: X=%d Y=%d ys=%d", x, y_adj, ys) + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis(x, y_adj, ys, [self._z_range] * self.num_channels) + + # Empty plunger before discard + await self.liha.position_valve_logical([0] * self.num_channels) + sep_vals: List[Optional[int]] = [int(600 * self.SPEED_FACTOR)] * self.num_channels + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep_vals) + await self.liha.position_plunger_absolute([0] * self.num_channels) + await self._zaapmotion_force_off() + + # Position at tip rack z_start and eject using mode=0 (above rack). + # Mode=1 (in rack) uses z_discard to push further down, which crashes + # on taller tip racks. Mode=0 ejects at the current Z reliably. + par = ops[0].resource.parent + assert isinstance(par, TecanTipRack), f"Expected TecanTipRack, got {type(par)}" + z_start = int(par.z_start) + await self.liha.move_absolute_z([z_start] * self.num_channels) + await self._driver.send_command("C5", command="SDT0,50,200") + await self.liha.drop_disposable_tip(self._bin_use_channels(use_channels), discard_height=0) + + async def aspirate( + self, + ops: List[Aspiration], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + from pylabrobot.resources import TecanPlate + + ys = self._get_ys(ops) + zadd: List[Optional[int]] = [0] * self.num_channels + for i, channel in enumerate(use_channels): + par = ops[i].resource.parent + if par is None: + continue + if not isinstance(par, TecanPlate): + raise ValueError(f"Operation is not supported by resource {par}.") + zadd[channel] = round(ops[i].volume / par.area * 10) + + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + # Use plate z_start/z_max directly (absolute Tecan Z coordinates). + # Adjust for mounted tip: tip extends reach, so Z target moves UP + # by (tip_length - nesting_depth). + first_par = ops[0].resource.parent + if isinstance(first_par, TecanPlate): + z_asp: List[Optional[int]] = [None] * self.num_channels + z_asp_max: List[Optional[int]] = [None] * self.num_channels + for i, channel in enumerate(use_channels): + tip_ext = int(ops[i].tip.total_tip_length * 10) - int(ops[i].tip.fitting_depth * 10) + z_asp[channel] = int(first_par.z_start) + tip_ext + z_asp_max[channel] = int(first_par.z_max) + tip_ext + else: + z_asp = z_positions.get("start", [self._z_range] * self.num_channels) + z_asp_max = z_positions.get("max", [self._z_range] * self.num_channels) + + x, y_adj = self._apply_calibration_offsets(x, y - yi * ys, ops) + z_asp_first = z_asp[next(i for i, v in enumerate(z_asp) if v is not None)] + logger.info("aspirate: X=%d Y=%d ys=%d Z=%d", x, y_adj, ys, z_asp_first) + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, + y_adj, + ys, + [self._z_range] * self.num_channels, + ) + + # Leading airgap with force mode + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "lag") + if any(ppr): + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + # Liquid level detection + if any(tlc.aspirate_lld if tlc is not None else None for tlc in tecan_liquid_classes): + tlc, _ = self._first_valid(tecan_liquid_classes) + assert tlc is not None + await self.liha.set_detection_mode(tlc.lld_mode, tlc.lld_conductivity) + ssl, sdl, sbl = self._liquid_detection(use_channels, tecan_liquid_classes) + await self.liha.set_search_speed(ssl) + await self.liha.set_search_retract_distance(sdl) + await self.liha.set_search_z_start([z if z is not None else self._z_range for z in z_asp]) + await self.liha.set_search_z_max([z if z is not None else self._z_range for z in z_asp_max]) + await self.liha.set_search_submerge(sbl) + shz = [self._z_range] * self.num_channels + await self.liha.set_z_travel_height(shz) + await self.liha.move_detect_liquid(self._bin_use_channels(use_channels), zadd) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + + # Aspirate + retract with force mode + zadd = [min(z, 32) if z else None for z in zadd] + ssz, sep, stz, mtr, ssz_r = self._aspirate_action(ops, use_channels, tecan_liquid_classes, zadd) + await self.liha.set_slow_speed_z(ssz) + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self._zaapmotion_force_off() + await self.liha.set_slow_speed_z(ssz_r) + await self.liha.move_absolute_z(z_asp) # retract to aspirate start height + + # Trailing airgap with force mode + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "tag") + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + async def dispense( + self, + ops: List[Dispense], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + from pylabrobot.resources import TecanPlate + + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + # Use plate z_dispense directly (absolute Tecan Z coordinate). + # Adjust for mounted tip. + first_par = ops[0].resource.parent + if isinstance(first_par, TecanPlate): + z_disp: List[Optional[int]] = [None] * self.num_channels + for i, channel in enumerate(use_channels): + tip_ext = int(ops[i].tip.total_tip_length * 10) - int(ops[i].tip.fitting_depth * 10) + z_disp[channel] = int(first_par.z_dispense) + tip_ext + else: + z_disp = [z if z else self._z_range for z in z_positions["dispense"]] + + x, y_adj = self._apply_calibration_offsets(x, y - yi * ys, ops) + z_disp_first = z_disp[next(i for i, v in enumerate(z_disp) if v is not None)] + logger.info("dispense: X=%d Y=%d ys=%d Z=%d", x, y_adj, ys, z_disp_first) + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, + y_adj, + ys, + z_disp, # type: ignore[arg-type] + ) + + sep, spp, stz, mtr = self._dispense_action(ops, use_channels, tecan_liquid_classes) + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_stop_speed_plunger(spp) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self._zaapmotion_force_off() diff --git a/pylabrobot/tecan/evo/arm.py b/pylabrobot/tecan/evo/arm.py new file mode 100644 index 00000000000..35dc8239969 --- /dev/null +++ b/pylabrobot/tecan/evo/arm.py @@ -0,0 +1,84 @@ +"""Tecan EVO RoMa arm frontend. + +Overrides the generic GripperArm to route pick/drop through the +carrier-based backend methods, since the RoMa Z trajectory requires +carrier-level attributes (roma_z_safe, roma_z_end) that cannot be +derived from a raw Coordinate alone. +""" + +from __future__ import annotations + +import logging +from typing import Optional, Union + +from pylabrobot.arms.arm import GripperArm, _PickedUpState +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.resources import ( + Coordinate, + Resource, + ResourceHolder, + ResourceStack, +) + +from pylabrobot.tecan.evo.roma_backend import EVORoMaBackend + +logger = logging.getLogger(__name__) + + +class TecanGripperArm(GripperArm): + """GripperArm for the Tecan EVO RoMa. + + Routes pick_up_resource and drop_resource through the backend's + carrier-based methods which compute RoMa X/Y/Z from carrier attributes. + Resource tracking (unassign/assign) is handled by the base class. + + Usage:: + + await evo.arm.move_resource(plate, carrier_dst[0]) + """ + + backend: EVORoMaBackend + + async def pick_up_resource( + self, + resource: Resource, + offset: Coordinate = Coordinate.zero(), + pickup_distance_from_top: Optional[float] = None, + backend_params: Optional[BackendParams] = None, + ): + resource_width = self._resource_width(resource) + self._begin_holding(resource_width) + + await self.backend.pick_up_from_carrier(resource, backend_params=backend_params) + + pickup_distance_from_top = self._resolve_pickup_distance(resource, pickup_distance_from_top) + self._picked_up = _PickedUpState( + resource=resource, + offset=offset, + pickup_distance_from_top=pickup_distance_from_top, + resource_width=resource_width, + ) + self._state_updated() + + async def drop_resource( + self, + destination: Union[ResourceStack, ResourceHolder, Resource, Coordinate], + offset: Coordinate = Coordinate.zero(), + backend_params: Optional[BackendParams] = None, + ): + resource = self._prepare_drop(destination) + if self._picked_up is None: + raise RuntimeError("No resource picked up") + + # Compute destination offset for the backend + if isinstance(destination, ResourceHolder): + dst_offset = destination.get_location_wrt(self._reference_resource) + elif isinstance(destination, Coordinate): + dst_offset = destination + else: + dst_offset = destination.get_location_wrt(self._reference_resource) + + await self.backend.drop_at_carrier(resource, dst_offset, backend_params=backend_params) + + # Use base class resource tracking + self._finalize_drop(resource, destination, 0) diff --git a/pylabrobot/tecan/evo/driver.py b/pylabrobot/tecan/evo/driver.py new file mode 100644 index 00000000000..e4c5b2d95b5 --- /dev/null +++ b/pylabrobot/tecan/evo/driver.py @@ -0,0 +1,156 @@ +"""Tecan EVO USB driver. + +Owns the USB connection and implements the Tecan firmware command protocol. +This is the v1b1 equivalent of the legacy ``TecanLiquidHandler`` class. +""" + +from __future__ import annotations + +import logging +from typing import Dict, List, Optional, Union + +from pylabrobot.device import Driver +from pylabrobot.io.usb import USB +from pylabrobot.tecan.evo.errors import error_code_to_exception + +logger = logging.getLogger(__name__) + + +class TecanEVODriver(Driver): + """Driver for the Tecan Freedom EVO liquid handler. + + Handles USB connection lifecycle and the Tecan firmware command protocol + (``\\x02{module}{command},{params}\\x00`` framing, response parsing, SET caching). + + Args: + packet_read_timeout: Timeout in seconds for reading a single USB packet. + read_timeout: Timeout in seconds for reading a full response. + write_timeout: Timeout in seconds for writing a command. + """ + + def __init__( + self, + packet_read_timeout: int = 12, + read_timeout: int = 60, + write_timeout: int = 60, + ): + super().__init__() + self.io = USB( + human_readable_device_name="Tecan EVO", + packet_read_timeout=packet_read_timeout, + read_timeout=read_timeout, + write_timeout=write_timeout, + id_vendor=0x0C47, + id_product=0x4000, + ) + self._cache: Dict[str, List[Optional[int]]] = {} + + def _assemble_command(self, module: str, command: str, params: List[Optional[int]]) -> str: + """Assemble a firmware command string. + + Args: + module: 2-character module identifier (e.g. ``"C5"`` for LiHa). + command: Command identifier (e.g. ``"PIA"``, ``"PAA"``). + params: List of integer parameters (``None`` for empty/placeholder). + + Returns: + Framed command string: ``\\x02{module}{command},{params}\\x00``. + """ + cmd = module + command + ",".join(str(a) if a is not None else "" for a in params) + return f"\02{cmd}\00" + + def parse_response(self, resp: bytes) -> Dict[str, Union[str, int, List[Union[int, str]]]]: + """Parse a firmware response. + + Args: + resp: Raw response bytes from the USB device. + + Returns: + Dict with ``"module"`` (str) and ``"data"`` (list of int/str values). + + Raises: + TecanError: If the response indicates a non-zero error code. + """ + s = resp.decode("utf-8", "ignore") + module = s[1:3] + ret = int(resp[3]) ^ (1 << 7) + if ret != 0: + raise error_code_to_exception(module, ret) + + data: List[Union[int, str]] = [] + for x in s[3:-1].split(","): + if len(x) == 0: + continue + data.append(int(x) if x.lstrip("-").isdigit() else x) + + return {"module": module, "data": data} + + async def send_command( + self, + module: str, + command: str, + params: Optional[List[Optional[int]]] = None, + write_timeout: Optional[int] = None, + read_timeout: Optional[int] = None, + wait: bool = True, + ) -> Optional[Dict[str, Union[str, int, List[Union[int, str]]]]]: + """Send a firmware command and return the parsed response. + + Caches SET commands (commands starting with ``"S"``) and skips sending + if the same command with the same parameters was already sent. + + Args: + module: 2-character module identifier. + command: Command identifier. + params: List of integer parameters. + write_timeout: Override write timeout (seconds). + read_timeout: Override read timeout (seconds). + wait: If ``True``, wait for and return the response. If ``False``, + return ``None`` immediately after sending. + + Returns: + Parsed response dict, or ``None`` if ``wait=False``. + + Raises: + TecanError: If the device returns a non-zero error code. + """ + if command[0] == "S" and params is not None: + k = module + command + if k in self._cache and self._cache[k] == params: + return None + self._cache[k] = params + + cmd = self._assemble_command(module, command, [] if params is None else params) + await self.io.write(cmd.encode(), timeout=write_timeout) + + if not wait: + return None + + resp = await self.io.read(timeout=read_timeout) + return self.parse_response(resp) + + async def setup(self) -> None: + """Open USB connection to the Tecan EVO. + + Uses a short packet timeout for buffer drain to avoid long waits + on first connection. + """ + logger.info("Opening USB connection to Tecan EVO...") + saved_prt = self.io.packet_read_timeout + self.io.packet_read_timeout = 1 # fast buffer drain + await self.io.setup() + self.io.packet_read_timeout = saved_prt + logger.info("USB connected.") + + async def stop(self) -> None: + """Close USB connection.""" + logger.info("Closing USB connection.") + await self.io.stop() + + def serialize(self) -> dict: + return { + **super().serialize(), + "packet_read_timeout": self.io.packet_read_timeout, + "read_timeout": self.io.read_timeout, + "write_timeout": self.io.write_timeout, + } diff --git a/pylabrobot/tecan/evo/errors.py b/pylabrobot/tecan/evo/errors.py new file mode 100644 index 00000000000..d5523eb6b80 --- /dev/null +++ b/pylabrobot/tecan/evo/errors.py @@ -0,0 +1,83 @@ +"""Tecan EVO error types and error code tables.""" + + +class TecanError(Exception): + """Tecan backend errors, raised by a single module.""" + + def __init__( + self, + message: str, + module: str, + error_code: int, + ): + self.message = message + self.module = module + self.error_code = error_code + + def __repr__(self) -> str: + return f"{self.__class__.__name__}('{self.message}')" + + +def error_code_to_exception(module: str, error_code: int) -> TecanError: + """Convert an error code to an exception.""" + table = None + + if module == "C5": + table = { + 1: "Initialization failed", + 2: "Invalid command", + 3: "Invalid operand", + 4: "CAN acknowledge problems", + 5: "Device not implemented", + 6: "CAN answer timeout", + 7: "Device not initialized", + 8: "Command overflow of TeCU", + 9: "No liquid detected", + 10: "Drive no load", + 11: "Not enough liquid", + 12: "Not enough liquid", + 13: "No Flash access", + 15: "Command overflow of subdevice", + 17: "Measurement failed", + 18: "Clot limit passed", + 19: "No clot exit detected", + 20: "No liquid exit detected", + 21: "Delta pressure overrun (pLLD)", + 22: "Tip Guard in wrong position", + 23: "Not yet moved or move aborted", + 24: "llid pulse error or reed crosstalk error", + 25: "Tip not fetched", + 26: "Tip not mounted", + 27: "Tip mounted", + 28: "Subdevice error", + 29: "Application switch and axes mismatch", + 30: "Wrong DC-Servo type", + 31: "Virtual Drive", + } + elif module == "C1": + table = { + 1: "Initialization failed", + 2: "Invalid command", + 3: "Invalid operand", + 4: "CAN acknowledge problems", + 5: "Device not implemented", + 6: "CAN answer timeout", + 7: "Device not initialized", + 8: "Command overflow of TeCU", + 9: "Plate not fetched", + 10: "Drive no load", + 11: "Sub device not ready yet", + 13: "No access to Flash-EPROM", + 14: "Hardware not defined", + 15: "Command overflow of this device", + 17: "Verification failed", + 21: "BCS communication error", + 25: "Download Error", + 28: "Sub device error", + 30: "Invalid servo version", + } + + if table is not None and error_code in table: + return TecanError(table[error_code], module, error_code) + + return TecanError(f"Unknown error code {error_code}", module, error_code) diff --git a/pylabrobot/tecan/evo/evo.py b/pylabrobot/tecan/evo/evo.py new file mode 100644 index 00000000000..1bedd50e559 --- /dev/null +++ b/pylabrobot/tecan/evo/evo.py @@ -0,0 +1,156 @@ +"""Tecan Freedom EVO composite device. + +Composes a TecanEVODriver with PIP (liquid handling) and GripperArm (RoMa) +capabilities using the v1b1 Device architecture. +""" + +from __future__ import annotations + +import logging +from typing import Optional + +from pylabrobot.tecan.evo.arm import TecanGripperArm +from pylabrobot.capabilities.liquid_handling.pip import PIP +from pylabrobot.device import Device +from pylabrobot.resources import Coordinate, Resource + +from pylabrobot.tecan.evo.air_pip_backend import AirEVOPIPBackend +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.pip_backend import EVOPIPBackend +from pylabrobot.tecan.evo.roma_backend import EVORoMaBackend + +logger = logging.getLogger(__name__) + + +class TecanEVO(Resource, Device): + """Tecan Freedom EVO liquid handling platform. + + Composes a USB driver with independent-channel pipetting (PIP) and + optionally a RoMa plate handling arm (GripperArm). + + Example:: + + from pylabrobot.tecan.evo import TecanEVO, TecanEVODriver + from pylabrobot.resources.tecan.tecan_decks import EVO150Deck + + deck = EVO150Deck() + evo = TecanEVO(name="evo", deck=deck, diti_count=8, air_liha=True) + await evo.setup() + + # Pipetting via PIP capability + await evo.pip.pick_up_tips(...) + await evo.pip.aspirate(...) + + # Plate handling via arm capability (if RoMa present) + await evo.arm.pick_up_resource(...) + + await evo.stop() + + Args: + name: Device name. + deck: Deck resource (e.g. EVO150Deck). + diti_count: Number of channels configured for disposable tips. + air_liha: If True, use AirEVOPIPBackend (ZaapMotion). Otherwise syringe. + has_roma: If True, include RoMa arm capability. + packet_read_timeout: USB packet read timeout in seconds. + read_timeout: USB read timeout in seconds. + write_timeout: USB write timeout in seconds. + """ + + def __init__( + self, + name: str = "evo", + deck: Optional[Resource] = None, + diti_count: int = 0, + air_liha: bool = False, + has_roma: bool = True, + packet_read_timeout: int = 12, + read_timeout: int = 60, + write_timeout: int = 60, + size_x: float = 1315, + size_y: float = 780, + size_z: float = 765, + ): + driver = TecanEVODriver( + packet_read_timeout=packet_read_timeout, + read_timeout=read_timeout, + write_timeout=write_timeout, + ) + + Resource.__init__(self, name=name, size_x=size_x, size_y=size_y, size_z=size_z) + Device.__init__(self, driver=driver) + + # Assign deck as child resource + if deck is not None: + self.assign_child_resource(deck, location=Coordinate.zero()) + + deck_ref = deck or self + + # PIP capability + pip_backend: EVOPIPBackend + if air_liha: + pip_backend = AirEVOPIPBackend(driver=driver, deck=deck_ref, diti_count=diti_count) + else: + pip_backend = EVOPIPBackend(driver=driver, deck=deck_ref, diti_count=diti_count) + self.pip = PIP(backend=pip_backend) + self._pip_backend = pip_backend + + # RoMa arm capability + self.arm: Optional[TecanGripperArm] = None + if has_roma: + roma_backend = EVORoMaBackend(driver=driver, deck=deck_ref) + self.arm = TecanGripperArm(backend=roma_backend, reference_resource=deck_ref) + + # Capabilities list: PIP first (LiHa PIA), then arm (RoMa PIA + park) + caps: list = [self.pip] + if self.arm is not None: + caps.append(self.arm) + self._capabilities = caps + + async def setup(self): + """Initialize EVO with collision-safe ordering. + + If the LiHa is already initialized but the RoMa needs PIA, the LiHa + is homed first to clear the RoMa's path. + """ + await self._driver.setup() + + # Initialize PIP (LiHa) first + await self.pip._on_setup() + + # Before RoMa init, check if RoMa needs PIA — if so, home LiHa first + if self.arm is not None: + roma_backend = self.arm.backend + roma_needs_init = await self._roma_needs_init(roma_backend) + + if ( + roma_needs_init + and self._pip_backend.liha is not None + and self._pip_backend._z_range is not None + ): + logger.info("RoMa needs PIA — homing LiHa to clear path.") + z_range = self._pip_backend._z_range + num_ch = self._pip_backend.num_channels + await self._pip_backend.liha.set_z_travel_height([z_range] * num_ch) + await self._pip_backend.liha.position_absolute_all_axis(45, 1031, 90, [z_range] * num_ch) + + await self.arm._on_setup() + + self._setup_finished = True + + async def _roma_needs_init(self, roma_backend: EVORoMaBackend) -> bool: + """Check if RoMa needs PIA (not already initialized).""" + from pylabrobot.tecan.evo.firmware.arm_base import EVOArm + + arm = EVOArm(self._driver, "C1") # type: ignore[arg-type] + try: + roma_err = await arm.read_error_register() + except TecanError as e: + if e.error_code == 5: + return False # RoMa not present + return True + + if roma_err and all(c == "@" for c in roma_err): + return False # Already initialized + return True diff --git a/pylabrobot/tecan/evo/firmware/__init__.py b/pylabrobot/tecan/evo/firmware/__init__.py new file mode 100644 index 00000000000..9a15d1e71f9 --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/__init__.py @@ -0,0 +1,4 @@ +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm +from pylabrobot.tecan.evo.firmware.liha import LiHa +from pylabrobot.tecan.evo.firmware.roma import RoMa +from pylabrobot.tecan.evo.firmware.zaapmotion import ZaapMotion diff --git a/pylabrobot/tecan/evo/firmware/arm_base.py b/pylabrobot/tecan/evo/firmware/arm_base.py new file mode 100644 index 00000000000..06a06e27bda --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/arm_base.py @@ -0,0 +1,104 @@ +"""Base class for Tecan EVO arm firmware wrappers. + +Provides position caching for collision avoidance between arms sharing the +same worktable (e.g. LiHa and RoMa X-axes). +""" + +from __future__ import annotations + +from typing import Dict, List, Protocol, runtime_checkable + + +@runtime_checkable +class CommandInterface(Protocol): + """Duck-typed interface for anything that can send Tecan firmware commands. + + This allows firmware wrappers to work with either the legacy EVOBackend + or the new TecanEVODriver without importing either. + """ + + async def send_command( + self, + module: str, + command: str, + params: list | None = ..., + **kwargs: object, + ) -> dict: ... + + +class EVOArm: + """Base class for EVO arm firmware wrappers. Caches arm positions.""" + + _pos_cache: Dict[str, int] = {} + + def __init__(self, interface: CommandInterface, module: str): + self.interface = interface + self.module = module + + async def position_initialization_x(self) -> None: + """Reinitializes X-axis of the arm.""" + await self.interface.send_command(module=self.module, command="PIX") + + async def report_x_param(self, param: int) -> int: + """Report current parameter for x-axis. + + Args: + param: 0 - current position, 5 - actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPX", params=[param]) + )["data"] + return resp[0] + + async def report_y_param(self, param: int) -> List[int]: + """Report current parameters for y-axis. + + Args: + param: 0 - current position, 5 - actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPY", params=[param]) + )["data"] + return resp + + async def read_error_register(self, param: int = 0) -> str: + """Read error register (REE). + + Args: + param: 0 = current errors, 1 = extended error info + + Returns: + Error string where each character represents one axis status. + ``'@'`` = no error, ``'A'`` = init failed, ``'G'`` = not initialized. + """ + resp = await self.interface.send_command(module=self.module, command="REE", params=[param]) + data = resp.get("data") + if data and isinstance(data, list) and len(data) > 0: + return str(data[0]) + return "" + + async def position_init_all(self) -> None: + """Initialize all axes (PIA).""" + await self.interface.send_command(module=self.module, command="PIA") + + async def position_init_bus(self) -> None: + """Initialize bus (PIB). Used for MCA modules.""" + await self.interface.send_command(module=self.module, command="PIB") + + async def set_bus_mode(self, mode: int) -> None: + """Set bus mode (BMX). + + Args: + mode: 2 = normal operation + """ + await self.interface.send_command(module=self.module, command="BMX", params=[mode]) + + async def bus_module_action(self, p1: int, p2: int, p3: int) -> None: + """Bus module action (BMA). Use ``(0, 0, 0)`` to halt all axes. + + Args: + p1: action parameter 1 + p2: action parameter 2 + p3: action parameter 3 + """ + await self.interface.send_command(module=self.module, command="BMA", params=[p1, p2, p3]) diff --git a/pylabrobot/tecan/evo/firmware/liha.py b/pylabrobot/tecan/evo/firmware/liha.py new file mode 100644 index 00000000000..f9a1eeb849d --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/liha.py @@ -0,0 +1,308 @@ +"""Firmware command wrapper for the Tecan EVO LiHa (Liquid Handling Arm). + +Provides typed methods for all LiHa firmware commands (plunger control, +valve positioning, Z-axis movement, liquid detection, tip handling). +""" + +from __future__ import annotations + +from typing import List, Optional + +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm + + +class LiHa(EVOArm): + """Firmware commands for the LiHa (Liquid Handling Arm).""" + + async def initialize_plunger(self, tips: int) -> None: + """Initializes plunger and valve drive. + + Args: + tips: binary coded tip select + """ + await self.interface.send_command(module=self.module, command="PID", params=[tips]) + + async def report_z_param(self, param: int) -> List[int]: + """Report current parameters for z-axis. + + Args: + param: 0=position, 1=accel, 2=fast_speed, 3=init_speed, 4=init_offset, + 5=range, 6=encoder_deviation, 9=slow_speed, 10=scale, 11=target, 12=travel + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPZ", params=[param]) + )["data"] + return resp + + async def report_number_tips(self) -> int: + """Report number of tips on arm.""" + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RNT", params=[1]) + )["data"] + return resp[0] + + async def position_absolute_all_axis(self, x: int, y: int, ys: int, z: List[int]) -> None: + """Position absolute for all LiHa axes. + + Args: + x: absolute x position in 1/10 mm + y: absolute y position in 1/10 mm + ys: absolute y spacing in 1/10 mm (90-380) + z: absolute z position in 1/10 mm for each channel + + Raises: + TecanError: if moving to the target position causes a collision + """ + cur_x = EVOArm._pos_cache.setdefault(self.module, await self.report_x_param(0)) + for module, pos in EVOArm._pos_cache.items(): + if module == self.module: + continue + if cur_x < x and cur_x < pos < x: + raise TecanError("Invalid command (collision)", self.module, 2) + if cur_x > x and cur_x > pos > x: + raise TecanError("Invalid command (collision)", self.module, 2) + if abs(pos - x) < 1500: + raise TecanError("Invalid command (collision)", self.module, 2) + + await self.interface.send_command( + module=self.module, command="PAA", params=list([x, y, ys] + z) + ) + EVOArm._pos_cache[self.module] = x + + async def position_valve_logical(self, param: List[Optional[int]]) -> None: + """Position valve logical for each channel. + + Args: + param: 0=outlet, 1=inlet, 2=bypass + """ + await self.interface.send_command(module=self.module, command="PVL", params=param) + + async def set_end_speed_plunger(self, speed: List[Optional[int]]) -> None: + """Set end speed for plungers. + + Args: + speed: half steps/sec per channel (5-6000) + """ + await self.interface.send_command(module=self.module, command="SEP", params=speed) + + async def move_plunger_relative(self, rel: List[Optional[int]]) -> None: + """Move plunger relative (positive=aspirate, negative=dispense). + + Args: + rel: full steps per channel (-3150 to 3150) + """ + await self.interface.send_command(module=self.module, command="PPR", params=rel) + + async def set_stop_speed_plunger(self, speed: List[Optional[int]]) -> None: + """Set stop speed for plungers. + + Args: + speed: half steps/sec per channel (50-2700) + """ + await self.interface.send_command(module=self.module, command="SPP", params=speed) + + async def set_detection_mode(self, proc: int, sense: int) -> None: + """Set liquid detection mode. + + Args: + proc: detection procedure (7 = double detection sequential) + sense: conductivity (1 = high) + """ + await self.interface.send_command(module=self.module, command="SDM", params=[proc, sense]) + + async def set_search_speed(self, speed: List[Optional[int]]) -> None: + """Set search speed for liquid search commands. + + Args: + speed: 1/10 mm/s per channel (1-1500) + """ + await self.interface.send_command(module=self.module, command="SSL", params=speed) + + async def set_search_retract_distance(self, dist: List[Optional[int]]) -> None: + """Set z-axis retract distance for liquid search commands. + + Args: + dist: 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="SDL", params=dist) + + async def set_search_submerge(self, dist: List[Optional[int]]) -> None: + """Set submerge for liquid search commands. + + Args: + dist: 1/10 mm per channel (-1000 to z_range) + """ + await self.interface.send_command(module=self.module, command="SBL", params=dist) + + async def set_search_z_start(self, z: List[Optional[int]]) -> None: + """Set z-start for liquid search commands. + + Args: + z: 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="STL", params=z) + + async def set_search_z_max(self, z: List[Optional[int]]) -> None: + """Set z-max for liquid search commands. + + Args: + z: 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="SML", params=z) + + async def set_z_travel_height(self, z: List[int]) -> None: + """Set z-travel height. + + Args: + z: travel heights in 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="SHZ", params=z) + + async def move_detect_liquid(self, channels: int, zadd: List[Optional[int]]) -> None: + """Move tip, detect liquid, submerge. + + Args: + channels: binary coded tip select + zadd: distance to travel downwards in 1/10 mm per channel + """ + await self.interface.send_command( + module=self.module, + command="MDT", + params=[channels] + [None] * 3 + zadd, + ) + + async def set_slow_speed_z(self, speed: List[Optional[int]]) -> None: + """Set slow speed for z. + + Args: + speed: 1/10 mm/s per channel (1-4000) + """ + await self.interface.send_command(module=self.module, command="SSZ", params=speed) + + async def set_tracking_distance_z(self, rel: List[Optional[int]]) -> None: + """Set z-axis relative tracking distance for aspirate/dispense. + + Args: + rel: 1/10 mm per channel (-2100 to 2100) + """ + await self.interface.send_command(module=self.module, command="STZ", params=rel) + + async def move_tracking_relative(self, rel: List[Optional[int]]) -> None: + """Move tracking relative (synchronous Z and plunger movement). + + Args: + rel: full steps per channel (-3150 to 3150) + """ + await self.interface.send_command(module=self.module, command="MTR", params=rel) + + async def move_absolute_z(self, z: List[Optional[int]]) -> None: + """Position absolute with slow speed z-axis. + + Args: + z: absolute position in 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="MAZ", params=z) + + async def get_disposable_tip(self, tips: int, z_start: int, z_search: int) -> None: + """Pick up disposable tips. + + Args: + tips: binary coded tip select + z_start: position in 1/10 mm where searching begins + z_search: search distance in 1/10 mm + """ + await self.interface.send_command( + module=self.module, + command="AGT", + params=[tips, z_start, z_search, 0], + ) + + async def position_plunger_absolute(self, positions: List[Optional[int]]) -> None: + """Move plunger to absolute position (PPA). + + Args: + positions: absolute plunger position in full steps per channel (0-3150). + 0 = fully dispensed, 3150 = fully aspirated. + """ + await self.interface.send_command(module=self.module, command="PPA", params=positions) + + async def set_disposable_tip_params(self, mode: int, z_discard: int, z_retract: int) -> None: + """Set disposable tip discard parameters (SDT). + + Args: + mode: 1 = discard in rack + z_discard: Z discard distance in 1/10 mm + z_retract: Z retract distance in 1/10 mm + """ + await self.interface.send_command( + module=self.module, command="SDT", params=[mode, z_discard, z_retract] + ) + + async def discard_disposable_tip_high(self, tips: int) -> None: + """Discard tips at Z-axis initialization height. + + Args: + tips: binary coded tip select + """ + await self.interface.send_command(module=self.module, command="ADT", params=[tips]) + + async def drop_disposable_tip(self, tips: int, discard_height: int) -> None: + """Discard tips at variable height. + + Args: + tips: binary coded tip select + discard_height: 0=above tip rack, 1=in tip rack + """ + await self.interface.send_command( + module=self.module, command="AST", params=[tips, discard_height] + ) + + # ============== Query commands ============== + + async def read_plunger_positions(self) -> List[int]: + """Read current plunger positions (RPP). + + Returns: + List of plunger positions in full steps per channel. + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPP", params=[0]) + )["data"] + return resp + + async def read_z_after_liquid_detection(self) -> List[int]: + """Read Z values after liquid detection (RVZ). + + Returns: + List of Z positions in 1/10 mm where liquid was detected, per channel. + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RVZ", params=[0]) + )["data"] + return resp + + async def read_tip_status(self) -> List[bool]: + """Read tip mounted status for each channel (RTS). + + Returns: + List of booleans: True if tip is mounted on that channel. + + Note: + Response format needs hardware validation. This implementation assumes + the response is a per-channel list of int values (0=no tip, 1=tip). + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RTS", params=[0]) + )["data"] + return [bool(v) for v in resp] + + async def position_absolute_z_bulk(self, z: List[Optional[int]]) -> None: + """Position absolute Z for all channels simultaneously (PAZ). + + Unlike :meth:`move_absolute_z` which uses slow speed, PAZ uses fast speed. + + Args: + z: absolute Z position in 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="PAZ", params=z) diff --git a/pylabrobot/tecan/evo/firmware/roma.py b/pylabrobot/tecan/evo/firmware/roma.py new file mode 100644 index 00000000000..8cec9a5a92d --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/roma.py @@ -0,0 +1,180 @@ +"""Firmware command wrapper for the Tecan EVO RoMa (Robotic Manipulator Arm). + +Provides typed methods for RoMa firmware commands (vector positioning, +gripper control, speed configuration). +""" + +from __future__ import annotations + +from typing import List, Optional + +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm + + +class RoMa(EVOArm): + """Firmware commands for the RoMa (Robotic Manipulator Arm).""" + + async def report_z_param(self, param: int) -> int: + """Report current parameter for z-axis. + + Args: + param: 0=current position, 5=actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPZ", params=[param]) + )["data"] + return resp[0] + + async def report_r_param(self, param: int) -> int: + """Report current parameter for r-axis (rotation). + + Args: + param: 0=current position, 5=actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPR", params=[param]) + )["data"] + return resp[0] + + async def report_g_param(self, param: int) -> int: + """Report current parameter for g-axis (gripper). + + Args: + param: 0=current position, 5=actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPG", params=[param]) + )["data"] + return resp[0] + + async def set_smooth_move_x(self, mode: int) -> None: + """Set X-axis smooth move mode. + + Args: + mode: 0=active (recalculate accel/speed by distance), 1=use SFX parameters directly + """ + await self.interface.send_command(module=self.module, command="SSM", params=[mode]) + + async def set_fast_speed_x(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for X-axis. + + Args: + speed: 1/10 mm/s + accel: 1/10 mm/s^2 + """ + await self.interface.send_command(module=self.module, command="SFX", params=[speed, accel]) + + async def set_fast_speed_y(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for Y-axis. + + Args: + speed: 1/10 mm/s + accel: 1/10 mm/s^2 + """ + await self.interface.send_command(module=self.module, command="SFY", params=[speed, accel]) + + async def set_fast_speed_z(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for Z-axis. + + Args: + speed: 1/10 mm/s + accel: 1/10 mm/s^2 + """ + await self.interface.send_command(module=self.module, command="SFZ", params=[speed, accel]) + + async def set_fast_speed_r(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for R-axis (rotation). + + Args: + speed: 1/10 deg/s + accel: 1/10 deg/s^2 + """ + await self.interface.send_command(module=self.module, command="SFR", params=[speed, accel]) + + async def set_vector_coordinate_position( + self, + v: int, + x: int, + y: int, + z: int, + r: int, + g: Optional[int], + speed: int, + tw: int = 0, + ) -> None: + """Set vector coordinate positions into table. + + Args: + v: vector index (1-100) + x: absolute x in 1/10 mm + y: absolute y in 1/10 mm + z: absolute z in 1/10 mm + r: absolute r in 1/10 deg + g: absolute gripper in 1/10 mm (optional) + speed: 0=slow, 1=fast + tw: target window class (set with STW) + + Raises: + TecanError: if movement would cause collision with another arm + """ + cur_x = EVOArm._pos_cache.setdefault(self.module, await self.report_x_param(0)) + for module, pos in EVOArm._pos_cache.items(): + if module == self.module: + continue + if cur_x < x and cur_x < pos < x: + raise TecanError("Invalid command (collision)", self.module, 2) + if cur_x > x and cur_x > pos > x: + raise TecanError("Invalid command (collision)", self.module, 2) + if abs(pos - x) < 1500: + raise TecanError("Invalid command (collision)", self.module, 2) + + await self.interface.send_command( + module=self.module, + command="SAA", + params=[v, x, y, z, r, g, speed, 0, tw], + ) + + async def action_move_vector_coordinate_position(self) -> None: + """Start coordinate movement built by the vector table.""" + await self.interface.send_command(module=self.module, command="AAC") + EVOArm._pos_cache[self.module] = await self.report_x_param(0) + + async def position_absolute_g(self, g: int) -> None: + """Move gripper to absolute position. + + Args: + g: absolute position in 1/10 mm + """ + await self.interface.send_command(module=self.module, command="PAG", params=[g]) + + async def set_gripper_params(self, speed: int, pwm: int, cur: Optional[int] = None) -> None: + """Set gripper parameters. + + Args: + speed: search speed in 1/10 mm/s + pwm: pulse width modification limit + cur: max current (optional) + """ + await self.interface.send_command(module=self.module, command="SGG", params=[speed, pwm, cur]) + + async def grip_plate(self, pos: int) -> None: + """Grip plate at current X/Y/Z/R position. + + Args: + pos: target position — plate must be found between current and target + """ + await self.interface.send_command(module=self.module, command="AGR", params=[pos]) + + async def set_target_window_class(self, wc: int, x: int, y: int, z: int, r: int, g: int) -> None: + """Set drive parameters for AAC command. + + Args: + wc: window class (1-100) + x: target window for x-axis in 1/10 mm + y: target window for y-axis in 1/10 mm + z: target window for z-axis in 1/10 mm + r: target window for r-axis in 1/10 deg + g: target window for g-axis in 1/10 mm + """ + await self.interface.send_command(module=self.module, command="STW", params=[wc, x, y, z, r, g]) diff --git a/pylabrobot/tecan/evo/firmware/zaapmotion.py b/pylabrobot/tecan/evo/firmware/zaapmotion.py new file mode 100644 index 00000000000..9199fb5ce0e --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/zaapmotion.py @@ -0,0 +1,81 @@ +"""Firmware wrapper for ZaapMotion BLDC motor controllers on Air LiHa. + +Each Air LiHa tip has an independent ZaapMotion BLDC controller +addressed via the transparent pipeline prefix ``T2{tip_index}``. +""" + +from __future__ import annotations + +from pylabrobot.tecan.evo.firmware.arm_base import CommandInterface + + +class ZaapMotion: + """Commands for ZaapMotion motor controllers (T2x pipeline). + + Each Air LiHa tip has an independent ZaapMotion BLDC controller + addressed via the transparent pipeline prefix ``T2{tip_index}``. + """ + + def __init__(self, interface: CommandInterface, module: str = "C5"): + self.interface = interface + self.module = module + + def _prefix(self, tip: int) -> str: + return f"T2{tip}" + + async def exit_boot_mode(self, tip: int) -> None: + """Exit bootloader mode (T2{tip}X).""" + await self.interface.send_command(module=self.module, command=f"{self._prefix(tip)}X") + + async def read_firmware_version(self, tip: int) -> str: + """Read firmware version (T2{tip}RFV). + + Returns: + Firmware version string. Contains ``'BOOT'`` if in bootloader mode. + """ + resp = await self.interface.send_command(module=self.module, command=f"{self._prefix(tip)}RFV") + return str(resp["data"][0]) if resp and resp.get("data") else "" + + async def read_config_status(self, tip: int) -> None: + """Read configuration status (T2{tip}RCS). + + Raises: + TecanError: if the controller is not yet configured. + """ + await self.interface.send_command(module=self.module, command=f"{self._prefix(tip)}RCS") + + async def set_force_ramp(self, tip: int, value: int) -> None: + """Set force ramp value (T2{tip}SFR{value}). + + Args: + value: Force ramp. 133120 for active, 3752 for idle. + """ + await self.interface.send_command(module=self.module, command=f"{self._prefix(tip)}SFR{value}") + + async def set_force_mode(self, tip: int) -> None: + """Enable force position mode (T2{tip}SFP1).""" + await self.interface.send_command(module=self.module, command=f"{self._prefix(tip)}SFP1") + + async def set_default_position(self, tip: int, value: int) -> None: + """Set default position (T2{tip}SDP{value}). + + Args: + value: Default position value. 1400 is standard idle. + """ + await self.interface.send_command(module=self.module, command=f"{self._prefix(tip)}SDP{value}") + + async def configure_motor(self, tip: int, command: str) -> None: + """Send a motor configuration command (T2{tip}{command}). + + Args: + command: One of the ZAAPMOTION_CONFIG commands (e.g. ``'CFE 255,500'``). + """ + await self.interface.send_command(module=self.module, command=f"{self._prefix(tip)}{command}") + + async def set_sdo(self, param: str) -> None: + """Set SDO parameter (T23SDO{param}). + + Args: + param: SDO parameter string (e.g. ``'11,1'``). + """ + await self.interface.send_command(module=self.module, command=f"T23SDO{param}") diff --git a/pylabrobot/tecan/evo/params.py b/pylabrobot/tecan/evo/params.py new file mode 100644 index 00000000000..47bd5a6e3dc --- /dev/null +++ b/pylabrobot/tecan/evo/params.py @@ -0,0 +1,48 @@ +"""Tecan EVO backend-specific parameters.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from pylabrobot.capabilities.capability import BackendParams + + +@dataclass(frozen=True) +class TecanPIPParams(BackendParams): + """EVO-specific parameters for PIP operations. + + Attributes: + liquid_detection_proc: Detection procedure for LLD. + 7 = double detection sequential (default). + liquid_detection_sense: Conductivity setting for LLD. + 1 = high conductivity (default). + tip_touch: If True, touch vessel wall after dispense to remove droplet. + tip_touch_offset_y: Y offset for tip touch in mm. + """ + + liquid_detection_proc: Optional[int] = None + liquid_detection_sense: Optional[int] = None + tip_touch: bool = False + tip_touch_offset_y: float = 1.0 + + +@dataclass(frozen=True) +class TecanRoMaParams(BackendParams): + """EVO-specific parameters for RoMa operations. + + Attributes: + speed_x: X-axis fast speed in 1/10 mm/s. + speed_y: Y-axis fast speed in 1/10 mm/s. + speed_z: Z-axis fast speed in 1/10 mm/s. + speed_r: R-axis fast speed in 1/10 deg/s. + accel_y: Y-axis acceleration in 1/10 mm/s^2. + accel_r: R-axis acceleration in 1/10 deg/s^2. + """ + + speed_x: Optional[int] = None + speed_y: Optional[int] = None + speed_z: Optional[int] = None + speed_r: Optional[int] = None + accel_y: Optional[int] = None + accel_r: Optional[int] = None diff --git a/pylabrobot/tecan/evo/pip_backend.py b/pylabrobot/tecan/evo/pip_backend.py new file mode 100644 index 00000000000..68b6a064458 --- /dev/null +++ b/pylabrobot/tecan/evo/pip_backend.py @@ -0,0 +1,593 @@ +"""PIPBackend for the Tecan EVO with syringe-based LiHa. + +Translates v1b1 PIP operations (Pickup, TipDrop, Aspiration, Dispense) into +Tecan firmware commands via the TecanEVODriver and LiHa firmware wrapper. +""" + +from __future__ import annotations + +import logging +from typing import Dict, List, Optional, Sequence, Tuple, TypeVar, Union + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.liquid_handling.pip_backend import PIPBackend +from pylabrobot.capabilities.liquid_handling.standard import ( + Aspiration, + Dispense, + Mix, + Pickup, + TipDrop, +) +from pylabrobot.legacy.liquid_handling.liquid_classes.tecan import ( + TecanLiquidClass, + get_liquid_class, +) +from pylabrobot.resources import Liquid, Resource, TecanPlate, TecanTipRack, Tip +from pylabrobot.resources.tecan.tip_creators import TecanTip + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware import LiHa +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm +from pylabrobot.tecan.evo.params import TecanPIPParams + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + +# Module identifiers +LIHA = "C5" +MCA = "W1" + + +class EVOPIPBackend(PIPBackend): + """PIPBackend for the Tecan EVO with syringe-based LiHa. + + Conversion factors for syringe dilutors (XP2000/XP6000): + - Volume: 3 full plunger steps per uL + - Speed: 6 half-steps/sec per uL/s + """ + + STEPS_PER_UL = 3.0 + SPEED_FACTOR = 6.0 + + def __init__( + self, + driver: TecanEVODriver, + deck: Resource, + diti_count: int = 0, + ): + """Create a new EVO PIP backend. + + Args: + driver: The TecanEVODriver that owns the USB connection. + deck: The deck resource (for coordinate calculations). + diti_count: Number of channels configured for disposable tips. + """ + self._driver = driver + self._deck = deck + self.diti_count = diti_count + + self._num_channels: Optional[int] = None + self._x_range: Optional[int] = None + self._y_range: Optional[int] = None + self._z_range: Optional[int] = None + self._z_traversal_height = 210 # mm + + self.liha: Optional[LiHa] = None + + @property + def num_channels(self) -> int: + if self._num_channels is None: + raise RuntimeError("Not yet set up. Call setup() first.") + return self._num_channels + + async def _on_setup(self) -> None: + """Initialize LiHa arm: PIA, query ranges, init plungers.""" + # Setup arm (PIA + BMX) + await self._setup_arm(LIHA) + + self.liha = LiHa(self._driver, LIHA) # type: ignore[arg-type] + await self.liha.position_initialization_x() + + self._num_channels = await self.liha.report_number_tips() + self._x_range = await self.liha.report_x_param(5) + self._y_range = (await self.liha.report_y_param(5))[0] + self._z_range = (await self.liha.report_z_param(5))[0] + + # Initialize plungers (assumes wash station at rail 1) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis(45, 1031, 90, [1200] * self.num_channels) + await self.liha.initialize_plunger(self._bin_use_channels(list(range(self.num_channels)))) + await self.liha.position_valve_logical([1] * self.num_channels) + await self.liha.move_plunger_relative([100] * self.num_channels) + await self.liha.position_valve_logical([0] * self.num_channels) + await self.liha.set_end_speed_plunger([1800] * self.num_channels) + await self.liha.move_plunger_relative([-100] * self.num_channels) + await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels) + logger.info("LiHa initialized: %d channels, z_range=%d", self._num_channels, self._z_range) + + async def _setup_arm(self, module: str) -> bool: + """Send PIA + BMX to initialize an arm module.""" + arm = EVOArm(self._driver, module) # type: ignore[arg-type] + try: + if module == MCA: + await arm.position_init_bus() + await arm.position_init_all() + except TecanError as e: + if e.error_code == 5: + return False + raise + if module != MCA: + await arm.set_bus_mode(2) + return True + + def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool: + return isinstance(tip, TecanTip) + + async def request_tip_presence(self) -> List[Optional[bool]]: + """Query tip mounted status for each channel via RTS firmware command.""" + assert self.liha is not None + statuses = await self.liha.read_tip_status() + result: List[Optional[bool]] = [None] * self.num_channels + for i in range(min(len(statuses), self.num_channels)): + result[i] = statuses[i] + return result + + # ============== Utility methods ============== + + def _first_valid(self, lst: List[Optional[T]]) -> Tuple[Optional[T], int]: + for i, v in enumerate(lst): + if v is not None: + return v, i + return None, -1 + + def _bin_use_channels(self, use_channels: List[int]) -> int: + b = 0 + for channel in use_channels: + b += 1 << channel + return b + + def _get_ys(self, ops: Sequence[Union[Aspiration, Dispense, Pickup, TipDrop]]) -> int: + """Get Y-spacing from plate well pitch or resource size.""" + par = ops[0].resource.parent + if hasattr(par, "item_dy"): + return int(par.item_dy * 10) # type: ignore[union-attr] + return int(ops[0].resource.get_absolute_size_y() * 10) + + def _liha_positions( + self, + ops: Sequence[Union[Aspiration, Dispense, Pickup, TipDrop]], + use_channels: List[int], + ) -> Tuple[List[Optional[int]], List[Optional[int]], Dict[str, List[Optional[int]]]]: + """Compute X, Y, Z positions for LiHa operations.""" + assert self._z_range is not None + + x_positions: List[Optional[int]] = [None] * self.num_channels + y_positions: List[Optional[int]] = [None] * self.num_channels + z_positions: Dict[str, List[Optional[int]]] = { + "travel": [None] * self.num_channels, + "start": [None] * self.num_channels, + "dispense": [None] * self.num_channels, + "max": [None] * self.num_channels, + } + + z_range = self._z_range + + def get_z_position(z: float, z_off: float, tip_length: int) -> int: + return int(z_range - z + z_off * 10 + tip_length) + + for i, (op, channel) in enumerate(zip(ops, use_channels)): + location = op.resource.get_location_wrt(self._deck) + op.resource.center() + x_positions[channel] = int((location.x - 100 + op.offset.x) * 10) + y_positions[channel] = int((346.5 - location.y + op.offset.y) * 10) + + par = op.resource.parent + if not isinstance(par, (TecanPlate, TecanTipRack)): + raise ValueError(f"Operation is not supported by resource {par}.") + + tip_length = int(op.tip.total_tip_length * 10) + + if isinstance(op, (Aspiration, Dispense)): + z_positions["travel"][channel] = round(self._z_traversal_height * 10) + + z_positions["start"][channel] = get_z_position( + par.z_start, par.get_location_wrt(self._deck).z + op.offset.z, tip_length + ) + z_positions["dispense"][channel] = get_z_position( + par.z_dispense, par.get_location_wrt(self._deck).z + op.offset.z, tip_length + ) + z_positions["max"][channel] = get_z_position( + par.z_max, par.get_location_wrt(self._deck).z + op.offset.z, tip_length + ) + + return x_positions, y_positions, z_positions + + # ============== Parameter computation ============== + + def _aspirate_airgap( + self, + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + airgap: str, + ) -> Tuple[List[Optional[int]], List[Optional[int]], List[Optional[int]]]: + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + pvl[channel] = 0 + if airgap == "lag": + sep[channel] = int(tlc.aspirate_lag_speed * self.SPEED_FACTOR) + ppr[channel] = int(tlc.aspirate_lag_volume * self.STEPS_PER_UL) + elif airgap == "tag": + sep[channel] = int(tlc.aspirate_tag_speed * self.SPEED_FACTOR) + ppr[channel] = int(tlc.aspirate_tag_volume * self.STEPS_PER_UL) + + return pvl, sep, ppr + + def _liquid_detection( + self, + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + ) -> Tuple[List[Optional[int]], List[Optional[int]], List[Optional[int]]]: + ssl: List[Optional[int]] = [None] * self.num_channels + sdl: List[Optional[int]] = [None] * self.num_channels + sbl: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + ssl[channel] = int(tlc.lld_speed * 10) + sdl[channel] = int(tlc.lld_distance * 10) + sbl[channel] = int(tlc.aspirate_lld_offset * 10) + + return ssl, sdl, sbl + + def _aspirate_action( + self, + ops: Sequence[Aspiration], + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + zadd: List[Optional[int]], + ) -> Tuple[ + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + ]: + ssz: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + stz: List[Optional[int]] = [-z if z else None for z in zadd] + mtr: List[Optional[int]] = [None] * self.num_channels + ssz_r: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + z = zadd[channel] + assert tlc is not None and z is not None + flow_rate = ops[i].flow_rate or tlc.aspirate_speed + sep[channel] = int(flow_rate * self.SPEED_FACTOR) + ssz[channel] = round(z * flow_rate / ops[i].volume) + volume = tlc.compute_corrected_volume(ops[i].volume) + mtr[channel] = round(volume * self.STEPS_PER_UL) + ssz_r[channel] = int(tlc.aspirate_retract_speed * 10) + + return ssz, sep, stz, mtr, ssz_r + + def _dispense_action( + self, + ops: Sequence[Dispense], + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + ) -> Tuple[ + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + ]: + sep: List[Optional[int]] = [None] * self.num_channels + spp: List[Optional[int]] = [None] * self.num_channels + stz: List[Optional[int]] = [None] * self.num_channels + mtr: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + flow_rate = ops[i].flow_rate or tlc.dispense_speed + sep[channel] = int(flow_rate * self.SPEED_FACTOR) + spp[channel] = int(tlc.dispense_breakoff * self.SPEED_FACTOR) + stz[channel] = 0 + volume = ( + tlc.compute_corrected_volume(ops[i].volume) + + tlc.aspirate_lag_volume + + tlc.aspirate_tag_volume + ) + mtr[channel] = -round(volume * self.STEPS_PER_UL) + + return sep, spp, stz, mtr + + def _get_liquid_classes( + self, ops: Sequence[Union[Aspiration, Dispense]] + ) -> List[Optional[TecanLiquidClass]]: + return [ + get_liquid_class( + target_volume=op.volume, + liquid_class=Liquid.WATER, + tip_type=op.tip.tip_type, + ) + if isinstance(op.tip, TecanTip) + else None + for op in ops + ] + + # ============== Mixing and blow-out ============== + + async def _perform_mix(self, mix: Mix, use_channels: List[int]) -> None: + """Perform mix cycles at the current tip position. + + Repeatedly aspirates and dispenses the mix volume. + + Args: + mix: Mix parameters (volume, repetitions, flow_rate). + use_channels: Which channels to mix on. + """ + assert self.liha is not None + + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr_asp: List[Optional[int]] = [None] * self.num_channels + ppr_disp: List[Optional[int]] = [None] * self.num_channels + + for channel in use_channels: + pvl[channel] = 0 # outlet + sep[channel] = int(mix.flow_rate * self.SPEED_FACTOR) + steps = int(mix.volume * self.STEPS_PER_UL) + ppr_asp[channel] = steps + ppr_disp[channel] = -steps + + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + + for _ in range(mix.repetitions): + await self.liha.move_plunger_relative(ppr_asp) + await self.liha.move_plunger_relative(ppr_disp) + + async def _perform_blow_out(self, ops: List[Dispense], use_channels: List[int]) -> None: + """Push extra air volume after dispense to expel remaining liquid. + + Args: + ops: Dispense operations (checks blow_out_air_volume). + use_channels: Channels to blow out. + """ + assert self.liha is not None + + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + has_blowout = False + + for i, channel in enumerate(use_channels): + bov = ops[i].blow_out_air_volume + if bov is not None and bov > 0: + has_blowout = True + pvl[channel] = 0 # outlet + sep[channel] = int(100 * self.SPEED_FACTOR) + ppr[channel] = -int(bov * self.STEPS_PER_UL) + + if has_blowout: + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + # ============== PIPBackend implementation ============== + + async def pick_up_tips( + self, + ops: List[Pickup], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, y - yi * ys, ys, [self._z_range] * self.num_channels + ) + + # Aspirate small air gap before tip pickup + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + for channel in use_channels: + pvl[channel] = 0 + sep[channel] = int(70 * self.SPEED_FACTOR) + ppr[channel] = int(10 * self.STEPS_PER_UL) + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + first_z_start, _ = self._first_valid(z_positions["start"]) + assert first_z_start is not None + await self.liha.get_disposable_tip( + self._bin_use_channels(use_channels), first_z_start - 227, 210 + ) + + async def drop_tips( + self, + ops: List[TipDrop], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, y - yi * ys, ys, [self._z_range] * self.num_channels + ) + await self.liha.drop_disposable_tip(self._bin_use_channels(use_channels), discard_height=0) + + async def aspirate( + self, + ops: List[Aspiration], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + ys = self._get_ys(ops) + zadd: List[Optional[int]] = [0] * self.num_channels + for i, channel in enumerate(use_channels): + par = ops[i].resource.parent + if par is None: + continue + if not isinstance(par, TecanPlate): + raise ValueError(f"Operation is not supported by resource {par}.") + zadd[channel] = round(ops[i].volume / par.area * 10) + + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [z if z else self._z_range for z in z_positions["travel"]], + ) + + # Leading airgap + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "lag") + if any(ppr): + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + # Liquid level detection + if any(tlc.aspirate_lld if tlc is not None else None for tlc in tecan_liquid_classes): + tlc, _ = self._first_valid(tecan_liquid_classes) + assert tlc is not None + lld_proc = tlc.lld_mode + lld_sense = tlc.lld_conductivity + if isinstance(backend_params, TecanPIPParams): + if backend_params.liquid_detection_proc is not None: + lld_proc = backend_params.liquid_detection_proc + if backend_params.liquid_detection_sense is not None: + lld_sense = backend_params.liquid_detection_sense + await self.liha.set_detection_mode(lld_proc, lld_sense) + ssl, sdl, sbl = self._liquid_detection(use_channels, tecan_liquid_classes) + await self.liha.set_search_speed(ssl) + await self.liha.set_search_retract_distance(sdl) + await self.liha.set_search_z_start(z_positions["start"]) + await self.liha.set_search_z_max(list(z if z else self._z_range for z in z_positions["max"])) + await self.liha.set_search_submerge(sbl) + shz = [min(z for z in z_positions["travel"] if z)] * self.num_channels + await self.liha.set_z_travel_height(shz) + await self.liha.move_detect_liquid(self._bin_use_channels(use_channels), zadd) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + + # Aspirate + retract + zadd = [min(z, 32) if z else None for z in zadd] + ssz, sep, stz, mtr, ssz_r = self._aspirate_action(ops, use_channels, tecan_liquid_classes, zadd) + await self.liha.set_slow_speed_z(ssz) + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self.liha.set_slow_speed_z(ssz_r) + await self.liha.move_absolute_z(z_positions["start"]) + + # Trailing airgap + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "tag") + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + # Post-aspirate mix + mix_channels = [ch for ch, op in zip(use_channels, ops) if op.mix is not None] + if mix_channels: + mix_op = next(op for op in ops if op.mix is not None) + assert mix_op.mix is not None + await self._perform_mix(mix_op.mix, mix_channels) + + async def dispense( + self, + ops: List[Dispense], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([z if z else self._z_range for z in z_positions["travel"]]) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [z if z else self._z_range for z in z_positions["dispense"]], + ) + + sep, spp, stz, mtr = self._dispense_action(ops, use_channels, tecan_liquid_classes) + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_stop_speed_plunger(spp) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + + # Blow-out + await self._perform_blow_out(ops, use_channels) + + # Tip touch + if isinstance(backend_params, TecanPIPParams) and backend_params.tip_touch: + touch_offset = int(backend_params.tip_touch_offset_y * 10) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys + touch_offset, + ys, + [z if z else self._z_range for z in z_positions["dispense"]], + ) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [z if z else self._z_range for z in z_positions["dispense"]], + ) + + # Post-dispense mix + mix_channels = [ch for ch, op in zip(use_channels, ops) if op.mix is not None] + if mix_channels: + mix_op = next(op for op in ops if op.mix is not None) + assert mix_op.mix is not None + await self._perform_mix(mix_op.mix, mix_channels) diff --git a/pylabrobot/tecan/evo/roma_backend.py b/pylabrobot/tecan/evo/roma_backend.py new file mode 100644 index 00000000000..0de23e6c12f --- /dev/null +++ b/pylabrobot/tecan/evo/roma_backend.py @@ -0,0 +1,304 @@ +"""GripperArmBackend for the Tecan EVO RoMa (Robotic Manipulator Arm). + +Translates v1b1 arm operations into Tecan RoMa firmware commands via the +TecanEVODriver and RoMa firmware wrapper. +""" + +from __future__ import annotations + +import logging +from typing import Dict, Optional, Tuple + +from pylabrobot.arms.backend import GripperArmBackend +from pylabrobot.arms.standard import GripperLocation +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.resources import Coordinate, Resource, TecanPlateCarrier +from pylabrobot.resources.rotation import Rotation + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware import RoMa +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm +from pylabrobot.tecan.evo.params import TecanRoMaParams + +logger = logging.getLogger(__name__) + +ROMA = "C1" + + +class EVORoMaBackend(GripperArmBackend): + """GripperArmBackend for the Tecan EVO RoMa plate handling arm. + + The RoMa grips plates along the Y-axis at a fixed 900 (90-degree) R-axis + orientation. It uses vector coordinate tables for multi-point trajectories + with target window classes for smooth acceleration profiles. + """ + + def __init__( + self, + driver: TecanEVODriver, + deck: Resource, + park_position: tuple = (9000, 2000, 2464, 1800), + ): + self._driver = driver + self._deck = deck + self._z_roma_traversal_height = 68.7 # mm + self._park_position = park_position + self.roma: Optional[RoMa] = None + + def _get_speeds(self, backend_params: Optional[BackendParams]) -> Dict[str, int]: + """Get speed settings, applying overrides from backend_params.""" + defaults = { + "x": 10000, + "y": 5000, + "z": 1300, + "r": 5000, + "accel_y": 1500, + "accel_r": 1500, + } + if isinstance(backend_params, TecanRoMaParams): + for key, attr in [ + ("x", "speed_x"), + ("y", "speed_y"), + ("z", "speed_z"), + ("r", "speed_r"), + ("accel_y", "accel_y"), + ("accel_r", "accel_r"), + ]: + val = getattr(backend_params, attr, None) + if val is not None: + defaults[key] = val + return defaults + + async def _on_setup(self) -> None: + """Initialize RoMa arm. Skips PIA if already initialized.""" + + arm = EVOArm(self._driver, ROMA) # type: ignore[arg-type] + + # Check if RoMa is present and already initialized + try: + roma_err = await arm.read_error_register() + except TecanError as e: + if e.error_code == 5: + logger.info("RoMa not present (error 5).") + return + roma_err = "" + + if roma_err and all(c == "@" for c in roma_err): + # Already initialized — skip PIA, just set up firmware wrapper + logger.info("RoMa already initialized (REE=%s), skipping PIA.", roma_err) + self.roma = RoMa(self._driver, ROMA) # type: ignore[arg-type] + return + + # Full init: PIA + park + logger.info("RoMa needs initialization, running PIA...") + try: + await arm.position_init_all() + except TecanError as e: + if e.error_code == 5: + logger.info("RoMa not present (error 5).") + return + raise + await arm.set_bus_mode(2) + + self.roma = RoMa(self._driver, ROMA) # type: ignore[arg-type] + await self.roma.position_initialization_x() + await self.park() + logger.info("RoMa initialized and parked.") + + async def _on_stop(self) -> None: + pass + + def _roma_positions( + self, + resource: Resource, + offset: Coordinate, + z_range: int, + ) -> Tuple[int, int, Dict[str, int]]: + """Compute RoMa X, Y, Z positions from resource and carrier attributes.""" + parent = resource.parent # PlateHolder + if parent is None: + raise ValueError(f"Operation is not supported by resource {resource}.") + parent = parent.parent # PlateCarrier + if not isinstance(parent, TecanPlateCarrier): + raise ValueError(f"Operation is not supported by resource {parent}.") + + if parent.roma_x is None or parent.roma_y is None: + raise ValueError(f"RoMa coordinates not defined for carrier {parent}.") + if parent.roma_z_safe is None or parent.roma_z_end is None: + raise ValueError(f"RoMa Z positions not defined for carrier {parent}.") + + x_position = int((offset.x - 100) * 10 + parent.roma_x) + y_position = int((347.1 - (offset.y + resource.get_absolute_size_y())) * 10 + parent.roma_y) + z_positions = { + "safe": z_range - int(parent.roma_z_safe), + "travel": int(self._z_roma_traversal_height * 10), + "end": z_range - int(parent.roma_z_end - offset.z * 10), + } + return x_position, y_position, z_positions + + # ============== GripperArmBackend implementation ============== + + async def pick_up_at_location( + self, + location: Coordinate, + resource_width: float, + backend_params: Optional[BackendParams] = None, + ) -> None: + """Pick up a plate at the given location. + + Note: The current implementation uses the legacy carrier-attribute-based + positioning rather than the raw Coordinate. The ``location`` is used to + find the resource and its parent carrier for RoMa coordinate computation. + """ + assert self.roma is not None + # For now, this method is called from the Device level which provides + # the absolute location. The RoMa needs carrier-specific attributes, + # so the Device.pick_up_resource method should call _pick_up_from_carrier + # with the full resource reference. + raise NotImplementedError( + "Use TecanEVO.pick_up_resource() which provides carrier context. " + "Direct pick_up_at_location with raw coordinates is not yet supported." + ) + + async def drop_at_location( + self, + location: Coordinate, + resource_width: float, + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.roma is not None + raise NotImplementedError( + "Use TecanEVO.drop_resource() which provides carrier context. " + "Direct drop_at_location with raw coordinates is not yet supported." + ) + + async def pick_up_from_carrier( + self, + resource: Resource, + backend_params: Optional[BackendParams] = None, + ) -> None: + """Pick up a plate using carrier-based RoMa coordinates.""" + assert self.roma is not None + + z_range = await self.roma.report_z_param(5) + offset = resource.get_location_wrt(self._deck) + x, y, z = self._roma_positions(resource, offset, z_range) + h = int(resource.get_absolute_size_y() * 10) + + # Move to resource + speeds = self._get_speeds(backend_params) + await self.roma.set_smooth_move_x(1) + await self.roma.set_fast_speed_x(speeds["x"]) + await self.roma.set_fast_speed_y(speeds["y"], speeds["accel_y"]) + await self.roma.set_fast_speed_z(speeds["z"]) + await self.roma.set_fast_speed_r(speeds["r"], speeds["accel_r"]) + await self.roma.set_vector_coordinate_position(1, x, y, z["safe"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + await self.roma.set_smooth_move_x(0) + + # Pick up + await self.roma.position_absolute_g(900) + await self.roma.set_target_window_class(1, 0, 0, 0, 135, 0) + await self.roma.set_vector_coordinate_position(1, x, y, z["travel"], 900, None, 1, 1) + await self.roma.set_vector_coordinate_position(1, x, y, z["end"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + await self.roma.set_fast_speed_y(3500, 1000) + await self.roma.set_fast_speed_r(2000, 600) + await self.roma.set_gripper_params(100, 75) + await self.roma.grip_plate(h - 100) + + # Verify plate was gripped + g_pos = await self.roma.report_g_param(0) + if g_pos >= 900: + logger.warning("Plate may not be gripped (G-axis position: %d)", g_pos) + + async def drop_at_carrier( + self, + resource: Resource, + destination: Coordinate, + backend_params: Optional[BackendParams] = None, + ) -> None: + """Drop a plate at a carrier location using RoMa coordinates.""" + assert self.roma is not None + + z_range = await self.roma.report_z_param(5) + offset = resource.get_location_wrt(self._deck) + x, y, z = self._roma_positions(resource, offset, z_range) + xt, yt, zt = self._roma_positions(resource, destination, z_range) + + # Multi-point trajectory to target + await self.roma.set_target_window_class(1, 0, 0, 0, 135, 0) + await self.roma.set_target_window_class(2, 0, 0, 0, 53, 0) + await self.roma.set_target_window_class(3, 0, 0, 0, 55, 0) + await self.roma.set_target_window_class(4, 45, 0, 0, 0, 0) + await self.roma.set_vector_coordinate_position(1, x, y, z["end"], 900, None, 1, 1) + await self.roma.set_vector_coordinate_position(2, x, y, z["travel"], 900, None, 1, 2) + await self.roma.set_vector_coordinate_position(3, x, y, z["safe"], 900, None, 1, 3) + await self.roma.set_vector_coordinate_position(4, xt, yt, zt["safe"], 900, None, 1, 4) + await self.roma.set_vector_coordinate_position(5, xt, yt, zt["travel"], 900, None, 1, 3) + await self.roma.set_vector_coordinate_position(6, xt, yt, zt["end"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + + # Release + await self.roma.position_absolute_g(900) + await self.roma.set_fast_speed_y(5000, 1500) + await self.roma.set_fast_speed_r(5000, 1500) + await self.roma.set_vector_coordinate_position(1, xt, yt, zt["end"], 900, None, 1, 1) + await self.roma.set_vector_coordinate_position(2, xt, yt, zt["travel"], 900, None, 1, 2) + await self.roma.set_vector_coordinate_position(3, xt, yt, zt["safe"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + await self.roma.set_fast_speed_y(3500, 1000) + await self.roma.set_fast_speed_r(2000, 600) + + async def move_to_location( + self, location: Coordinate, backend_params: Optional[BackendParams] = None + ) -> None: + assert self.roma is not None + z_range = await self.roma.report_z_param(5) + x = int((location.x - 100) * 10) + y = int((347.1 - location.y) * 10) + z_safe = z_range - 946 # default safe Z + await self.roma.set_vector_coordinate_position(1, x, y, z_safe, 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + + async def halt(self, backend_params: Optional[BackendParams] = None) -> None: + assert self.roma is not None + await self.roma.bus_module_action(0, 0, 0) + + async def park(self, backend_params: Optional[BackendParams] = None) -> None: + assert self.roma is not None + px, py, pz, pr = self._park_position + await self.roma.set_vector_coordinate_position(1, px, py, pz, pr, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + + async def open_gripper( + self, gripper_width: float, backend_params: Optional[BackendParams] = None + ) -> None: + assert self.roma is not None + await self.roma.position_absolute_g(int(gripper_width * 10)) + + async def close_gripper( + self, gripper_width: float, backend_params: Optional[BackendParams] = None + ) -> None: + assert self.roma is not None + await self.roma.set_gripper_params(100, 75) + await self.roma.grip_plate(int(gripper_width * 10)) + + async def is_gripper_closed(self, backend_params: Optional[BackendParams] = None) -> bool: + assert self.roma is not None + pos = await self.roma.report_g_param(0) + return pos < 100 # heuristic: < 10mm = closed + + async def get_gripper_location( + self, backend_params: Optional[BackendParams] = None + ) -> GripperLocation: + assert self.roma is not None + x = await self.roma.report_x_param(0) + y = (await self.roma.report_y_param(0))[0] + z = await self.roma.report_z_param(0) + r = await self.roma.report_r_param(0) + return GripperLocation( + location=Coordinate(x=x / 10.0, y=y / 10.0, z=z / 10.0), + rotation=Rotation(x=0, y=0, z=r / 10.0), + ) diff --git a/pylabrobot/tecan/evo/tests/__init__.py b/pylabrobot/tecan/evo/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/tecan/evo/tests/air_pip_backend_tests.py b/pylabrobot/tecan/evo/tests/air_pip_backend_tests.py new file mode 100644 index 00000000000..50bfc7fff5c --- /dev/null +++ b/pylabrobot/tecan/evo/tests/air_pip_backend_tests.py @@ -0,0 +1,251 @@ +"""Unit tests for AirEVOPIPBackend (Air LiHa / ZaapMotion).""" + +import unittest +from unittest.mock import AsyncMock, call + +from pylabrobot.capabilities.liquid_handling.standard import Pickup +from pylabrobot.resources import Coordinate, EVO150Deck +from pylabrobot.resources.tecan.plate_carriers import MP_3Pos +from pylabrobot.resources.tecan.plates import Microplate_96_Well +from pylabrobot.resources.tecan.tip_carriers import DiTi_3Pos +from pylabrobot.resources.tecan.tip_racks import DiTi_50ul_SBS_LiHa +from pylabrobot.tecan.evo.air_pip_backend import AirEVOPIPBackend, ZAAPMOTION_CONFIG +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware import LiHa +from pylabrobot.tecan.evo.firmware.zaapmotion import ZaapMotion + + +class AirPIPTestBase(unittest.IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + + self.driver = TecanEVODriver() + self.driver.send_command = AsyncMock() + + async def mock_send(module="", command="", params=None, **kwargs): + if command == "RPX": + return {"data": [5000]} + if command == "RPY": + return {"data": [1500, 90]} + if command == "RPZ": + return {"data": [2100, 2100, 2100, 2100, 2100, 2100, 2100, 2100]} + if command == "RNT": + return {"data": [8]} + if command == "REE": + if params and params[0] == 1: + return {"data": ["XYSZZZZZZZZ"]} + return {"data": ["@@@@@@@@@@@"]} + if "RFV" in command: + return {"data": ["XP2000-V1.20-02/2015", "1.2.0.10946", "ZMA"]} + if "RCS" in command: + return {"data": []} + return {"data": []} + + self.driver.send_command.side_effect = mock_send + + self.deck = EVO150Deck() + self.backend = AirEVOPIPBackend(driver=self.driver, deck=self.deck, diti_count=8) + + self.backend._num_channels = 8 + self.backend._x_range = 9866 + self.backend._y_range = 2833 + self.backend._z_range = 2100 + self.backend.liha = LiHa(self.driver, "C5") + self.backend.zaap = ZaapMotion(self.driver) + + self.tip_carrier = DiTi_3Pos(name="tip_carrier") + self.tip_carrier[0] = self.tip_rack = DiTi_50ul_SBS_LiHa(name="tips") + self.deck.assign_child_resource(self.tip_carrier, rails=10) + + self.plate_carrier = MP_3Pos(name="plate_carrier") + self.plate_carrier[0] = self.plate = Microplate_96_Well(name="plate") + self.deck.assign_child_resource(self.plate_carrier, rails=20) + + self.driver.send_command.reset_mock() + + +class AirConversionTests(AirPIPTestBase): + def test_air_steps_per_ul(self): + self.assertEqual(AirEVOPIPBackend.STEPS_PER_UL, 106.4) + + def test_air_speed_factor(self): + self.assertEqual(AirEVOPIPBackend.SPEED_FACTOR, 213.0) + + def test_sfr_constants(self): + self.assertEqual(AirEVOPIPBackend.SFR_ACTIVE, 133120) + self.assertEqual(AirEVOPIPBackend.SFR_IDLE, 3752) + self.assertEqual(AirEVOPIPBackend.SDP_DEFAULT, 1400) + + +class AirForceModeSFRTests(AirPIPTestBase): + async def test_force_on_sends_16_commands(self): + """8 SFR + 8 SFP1 = 16 commands.""" + await self.backend._zaapmotion_force_on() + sfr = [c for c in self.driver.send_command.call_args_list if "SFR133120" in str(c)] + sfp = [c for c in self.driver.send_command.call_args_list if "SFP1" in str(c)] + self.assertEqual(len(sfr), 8) + self.assertEqual(len(sfp), 8) + + async def test_force_off_sends_16_commands(self): + """8 SFR + 8 SDP = 16 commands.""" + await self.backend._zaapmotion_force_off() + sfr = [c for c in self.driver.send_command.call_args_list if "SFR3752" in str(c)] + sdp = [c for c in self.driver.send_command.call_args_list if "SDP1400" in str(c)] + self.assertEqual(len(sfr), 8) + self.assertEqual(len(sdp), 8) + + +class AirInitSkipTests(AirPIPTestBase): + async def test_initialized_all_ok(self): + result = await self.backend._is_initialized() + self.assertTrue(result) + + async def test_not_initialized_with_A(self): + async def send(module, command, params=None, **kwargs): + if command == "REE": + return {"data": ["GGGAAAAAAAA"]} + return {"data": []} + + self.driver.send_command.side_effect = send + result = await self.backend._is_initialized() + self.assertFalse(result) + + async def test_not_initialized_with_G(self): + async def send(module, command, params=None, **kwargs): + if command == "REE": + return {"data": ["GGGGGGGGGGG"]} + return {"data": []} + + self.driver.send_command.side_effect = send + result = await self.backend._is_initialized() + self.assertFalse(result) + + async def test_initialized_with_Y_tip_not_fetched(self): + async def send(module, command, params=None, **kwargs): + if command == "REE": + return {"data": ["@@@YYYYYYY@"]} + return {"data": []} + + self.driver.send_command.side_effect = send + result = await self.backend._is_initialized() + self.assertTrue(result) + + async def test_initialized_with_timeout(self): + self.driver.send_command.side_effect = TimeoutError() + result = await self.backend._is_initialized() + self.assertFalse(result) + + +class AirZaapMotionConfigTests(AirPIPTestBase): + def test_config_has_33_commands(self): + self.assertEqual(len(ZAAPMOTION_CONFIG), 33) + + def test_config_starts_with_cfe(self): + self.assertEqual(ZAAPMOTION_CONFIG[0], "CFE 255,500") + + def test_config_ends_with_wrp(self): + self.assertEqual(ZAAPMOTION_CONFIG[-1], "WRP") + + async def test_configure_skips_when_already_configured(self): + """RCS returning OK → skip config.""" + await self.backend._configure_zaapmotion() + config_calls = [ + c + for c in self.driver.send_command.call_args_list + if any(cmd in str(c) for cmd in ["CFE", "CMTBLDC", "WRP"]) + ] + self.assertEqual(len(config_calls), 0) + + async def test_configure_sends_boot_exit_when_in_boot(self): + """If RFV returns BOOT, send X to exit.""" + call_count = [0] + + async def send(module, command, params=None, **kwargs): + call_count[0] += 1 + if "RFV" in command: + # First call: BOOT, after X: app mode + if call_count[0] <= 2: + return {"data": ["XP2-BOOT-V1.00-05/2011", "1.0.0.9506", "ZMB"]} + return {"data": ["XP2000-V1.20-02/2015", "1.2.0.10946", "ZMA"]} + if "RCS" in command: + raise TecanError("Not configured", "C5", 2) + return {"data": []} + + self.driver.send_command.side_effect = send + # Will only configure tip 0 before our mock resets + # Just verify X command is sent + try: + await self.backend._configure_zaapmotion() + except Exception: + pass + x_calls = [c for c in self.driver.send_command.call_args_list if "T20X" in str(c)] + self.assertGreater(len(x_calls), 0) + + async def test_safety_module(self): + await self.backend._setup_safety_module() + self.driver.send_command.assert_any_call("O1", command="SPN") + self.driver.send_command.assert_any_call("O1", command="SPS3") + + +class AirPickUpTipsTests(AirPIPTestBase): + async def test_uses_air_conversion_factors(self): + op = Pickup( + resource=self.tip_rack.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + ) + await self.backend.pick_up_tips([op], use_channels=[0]) + + # SEP: 70 * 213 = 14910 + sep_calls = [ + c + for c in self.driver.send_command.call_args_list + if c + == call(module="C5", command="SEP", params=[14910, None, None, None, None, None, None, None]) + ] + self.assertEqual(len(sep_calls), 1) + + # PPR: 10 * 106.4 = 1064 + ppr_calls = [ + c + for c in self.driver.send_command.call_args_list + if c + == call(module="C5", command="PPR", params=[1064, None, None, None, None, None, None, None]) + ] + self.assertEqual(len(ppr_calls), 1) + + async def test_force_mode_wraps_plunger(self): + """Verify SFR/SFP sent before and SFR/SDP after plunger ops.""" + op = Pickup( + resource=self.tip_rack.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + ) + await self.backend.pick_up_tips([op], use_channels=[0]) + + all_cmds = [str(c) for c in self.driver.send_command.call_args_list] + # Find force_on before PPR and force_off after + sfr_active_indices = [i for i, c in enumerate(all_cmds) if "SFR133120" in c] + ppr_indices = [i for i, c in enumerate(all_cmds) if "'PPR'" in c] + sfr_idle_indices = [i for i, c in enumerate(all_cmds) if "SFR3752" in c] + + if sfr_active_indices and ppr_indices and sfr_idle_indices: + self.assertLess(min(sfr_active_indices), min(ppr_indices)) + self.assertGreater(min(sfr_idle_indices), max(ppr_indices)) + + async def test_agt_uses_tip_rack_z_start(self): + """AGT should use the tip rack's z_start directly.""" + op = Pickup( + resource=self.tip_rack.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + ) + await self.backend.pick_up_tips([op], use_channels=[0]) + + agt_calls = [ + c for c in self.driver.send_command.call_args_list if c.kwargs.get("command") == "AGT" + ] + self.assertEqual(len(agt_calls), 1) + params = agt_calls[0].kwargs["params"] + self.assertEqual(params[1], int(self.tip_rack.z_start)) diff --git a/pylabrobot/tecan/evo/tests/driver_tests.py b/pylabrobot/tecan/evo/tests/driver_tests.py new file mode 100644 index 00000000000..7a9bb1845f0 --- /dev/null +++ b/pylabrobot/tecan/evo/tests/driver_tests.py @@ -0,0 +1,121 @@ +"""Unit tests for TecanEVODriver.""" + +import unittest + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError + + +class CommandAssemblyTests(unittest.TestCase): + def setUp(self): + self.driver = TecanEVODriver() + + def test_assemble_no_params(self): + result = self.driver._assemble_command("C5", "PIA", []) + self.assertEqual(result, "\x02C5PIA\x00") + + def test_assemble_with_params(self): + result = self.driver._assemble_command("C5", "PAA", [100, 200, 90]) + self.assertEqual(result, "\x02C5PAA100,200,90\x00") + + def test_assemble_with_none_params(self): + result = self.driver._assemble_command("C5", "MDT", [255, None, None, None, 37]) + self.assertEqual(result, "\x02C5MDT255,,,,37\x00") + + def test_assemble_roma_module(self): + result = self.driver._assemble_command("C1", "PIX", []) + self.assertEqual(result, "\x02C1PIX\x00") + + +class ResponseParsingTests(unittest.TestCase): + def setUp(self): + self.driver = TecanEVODriver() + + def test_parse_success_no_data(self): + # Status byte 0x80 XOR 0x80 = 0 (success), no data + resp = b"\x02C5\x80\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["module"], "C5") + self.assertEqual(result["data"], []) + + def test_parse_success_with_int_data(self): + # Status byte 0x80 = success, data "8" + resp = b"\x02C5\x808\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["module"], "C5") + self.assertEqual(result["data"], [8]) + + def test_parse_success_with_csv_data(self): + resp = b"\x02C5\x802100,2100,2100\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["data"], [2100, 2100, 2100]) + + def test_parse_success_with_string_data(self): + resp = b"\x02C5\x80LIHACU-V1.80\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["data"], ["LIHACU-V1.80"]) + + def test_parse_error_code(self): + # Status byte: error code 1 with bit 7 set = 0x81 + resp = b"\x02C5\x81\x00" + with self.assertRaises(TecanError) as ctx: + self.driver.parse_response(resp) + self.assertEqual(ctx.exception.error_code, 1) + self.assertEqual(ctx.exception.module, "C5") + self.assertIn("Initialization failed", ctx.exception.message) + + def test_parse_error_code_3(self): + resp = b"\x02C5\x83\x00" + with self.assertRaises(TecanError) as ctx: + self.driver.parse_response(resp) + self.assertEqual(ctx.exception.error_code, 3) + self.assertIn("Invalid operand", ctx.exception.message) + + def test_parse_roma_error(self): + resp = b"\x02C1\x85\x00" + with self.assertRaises(TecanError) as ctx: + self.driver.parse_response(resp) + self.assertEqual(ctx.exception.error_code, 5) + self.assertEqual(ctx.exception.module, "C1") + + def test_parse_negative_data(self): + resp = b"\x02C5\x80-155\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["data"], [-155]) + + +class CachingTests(unittest.IsolatedAsyncioTestCase): + async def test_set_command_cached(self): + driver = TecanEVODriver() + # Simulate caching without actual USB + driver._cache["C5SEP"] = [1800, 1800] + # Same params should return None (cached) + # We can't call send_command without USB, but we can test the cache logic + k = "C5SEP" + params = [1800, 1800] + self.assertIn(k, driver._cache) + self.assertEqual(driver._cache[k], params) + + def test_cache_different_params(self): + driver = TecanEVODriver() + driver._cache["C5SEP"] = [1800, 1800] + # Different params should NOT match + new_params = [2000, 2000] + self.assertNotEqual(driver._cache["C5SEP"], new_params) + + +class SerializationTests(unittest.TestCase): + def test_serialize(self): + driver = TecanEVODriver(packet_read_timeout=30, read_timeout=120, write_timeout=120) + data = driver.serialize() + self.assertEqual(data["type"], "TecanEVODriver") + self.assertEqual(data["packet_read_timeout"], 30) + self.assertEqual(data["read_timeout"], 120) + self.assertEqual(data["write_timeout"], 120) + + def test_serialize_defaults(self): + driver = TecanEVODriver() + data = driver.serialize() + self.assertEqual(data["packet_read_timeout"], 12) + self.assertEqual(data["read_timeout"], 60) + self.assertEqual(data["write_timeout"], 60) diff --git a/pylabrobot/tecan/evo/tests/pip_backend_tests.py b/pylabrobot/tecan/evo/tests/pip_backend_tests.py new file mode 100644 index 00000000000..e5430b6e161 --- /dev/null +++ b/pylabrobot/tecan/evo/tests/pip_backend_tests.py @@ -0,0 +1,214 @@ +"""Unit tests for EVOPIPBackend (syringe LiHa).""" + +import unittest +from unittest.mock import AsyncMock, call + +from pylabrobot.capabilities.liquid_handling.standard import Aspiration, Dispense, Pickup, TipDrop +from pylabrobot.resources import Coordinate, EVO150Deck +from pylabrobot.resources.tecan.plate_carriers import MP_3Pos +from pylabrobot.resources.tecan.plates import Microplate_96_Well +from pylabrobot.resources.tecan.tip_carriers import DiTi_3Pos +from pylabrobot.resources.tecan.tip_racks import DiTi_100ul_Te_MO +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.firmware import LiHa +from pylabrobot.tecan.evo.pip_backend import EVOPIPBackend + + +class PIPBackendTestBase(unittest.IsolatedAsyncioTestCase): + """Base class with mocked driver and deck setup.""" + + def setUp(self): + super().setUp() + + self.driver = TecanEVODriver() + self.driver.send_command = AsyncMock() + + async def mock_send(module, command, params=None, **kwargs): + if command == "RPX": + return {"data": [5000]} + if command == "RPY": + return {"data": [1500, 90]} + if command == "RPZ": + return {"data": [2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000]} + if command == "RNT": + return {"data": [8]} + return {"data": []} + + self.driver.send_command.side_effect = mock_send + + self.deck = EVO150Deck() + self.backend = EVOPIPBackend(driver=self.driver, deck=self.deck, diti_count=8) + + # Skip actual setup, set state directly + self.backend._num_channels = 8 + self.backend._x_range = 9866 + self.backend._y_range = 2833 + self.backend._z_range = 2100 + self.backend.liha = LiHa(self.driver, "C5") + + # Deck layout + self.tip_carrier = DiTi_3Pos(name="tip_carrier") + self.tip_carrier[0] = self.tip_rack = DiTi_100ul_Te_MO(name="tips") + self.deck.assign_child_resource(self.tip_carrier, rails=10) + + self.plate_carrier = MP_3Pos(name="plate_carrier") + self.plate_carrier[0] = self.plate = Microplate_96_Well(name="plate") + self.deck.assign_child_resource(self.plate_carrier, rails=20) + + self.driver.send_command.reset_mock() + + +class ConversionTests(PIPBackendTestBase): + def test_syringe_steps_per_ul(self): + self.assertEqual(EVOPIPBackend.STEPS_PER_UL, 3.0) + + def test_syringe_speed_factor(self): + self.assertEqual(EVOPIPBackend.SPEED_FACTOR, 6.0) + + +class UtilityTests(PIPBackendTestBase): + def test_bin_use_channels_single(self): + self.assertEqual(self.backend._bin_use_channels([0]), 1) + + def test_bin_use_channels_multiple(self): + self.assertEqual(self.backend._bin_use_channels([0, 2, 4]), 0b10101) + + def test_bin_use_channels_all_eight(self): + self.assertEqual(self.backend._bin_use_channels(list(range(8))), 255) + + def test_first_valid(self): + val, idx = self.backend._first_valid([None, None, 42, None]) + self.assertEqual(val, 42) + self.assertEqual(idx, 2) + + def test_first_valid_none(self): + val, idx = self.backend._first_valid([None, None]) + self.assertIsNone(val) + self.assertEqual(idx, -1) + + def test_get_ys_from_plate(self): + op = Aspiration( + resource=self.plate.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + volume=25.0, + flow_rate=None, + liquid_height=None, + blow_out_air_volume=None, + mix=None, + ) + ys = self.backend._get_ys([op]) + # Microplate_96_Well has ~9mm well pitch (int truncation of 8.999... * 10) + self.assertIn(ys, [89, 90]) + + +class PickUpTipsTests(PIPBackendTestBase): + async def test_pick_up_sends_correct_commands(self): + op = Pickup( + resource=self.tip_rack.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + ) + await self.backend.pick_up_tips([op], use_channels=[0]) + + # Should send SHZ, PAA, PVL, SEP, PPR, AGT + cmd_names = [ + c.kwargs.get("command", c.args[1] if len(c.args) > 1 else "?") + for c in self.driver.send_command.call_args_list + if c.kwargs.get("module") == "C5" or (c.args and c.args[0] == "C5") + ] + self.assertIn("SHZ", cmd_names) + self.assertIn("PVL", cmd_names) + self.assertIn("SEP", cmd_names) + self.assertIn("PPR", cmd_names) + self.assertIn("AGT", cmd_names) + + async def test_pick_up_syringe_conversion(self): + """Verify syringe LiHa uses factor 3/6 for tip pickup air gap.""" + op = Pickup( + resource=self.tip_rack.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + ) + await self.backend.pick_up_tips([op], use_channels=[0]) + + # SEP should use speed factor 6: 70 * 6 = 420 + sep_calls = [ + c + for c in self.driver.send_command.call_args_list + if c + == call(module="C5", command="SEP", params=[420, None, None, None, None, None, None, None]) + ] + self.assertEqual(len(sep_calls), 1) + + # PPR should use steps/uL 3: 10 * 3 = 30 + ppr_calls = [ + c + for c in self.driver.send_command.call_args_list + if c + == call(module="C5", command="PPR", params=[30, None, None, None, None, None, None, None]) + ] + self.assertEqual(len(ppr_calls), 1) + + +class DropTipsTests(PIPBackendTestBase): + async def test_drop_sends_ast(self): + op = TipDrop( + resource=self.tip_rack.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + ) + await self.backend.drop_tips([op], use_channels=[0]) + + # Should send SHZ, PAA, AST + cmd_names = [c.kwargs.get("command", "?") for c in self.driver.send_command.call_args_list] + self.assertIn("AST", cmd_names) + + +class AspirateTests(PIPBackendTestBase): + async def test_aspirate_sends_tracking_commands(self): + op = Aspiration( + resource=self.plate.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + volume=25.0, + flow_rate=None, + liquid_height=None, + blow_out_air_volume=None, + mix=None, + ) + await self.backend.aspirate([op], use_channels=[0]) + + cmd_names = [c.kwargs.get("command", "?") for c in self.driver.send_command.call_args_list] + # Core aspirate sequence: SHZ, PAA, PVL, SEP, PPR (airgap), SSZ, SEP, STZ, MTR, SSZ, MAZ, PVL, SEP, PPR (tag) + self.assertIn("MTR", cmd_names) # tracking movement + self.assertIn("MAZ", cmd_names) # retract + + +class DispenseTests(PIPBackendTestBase): + async def test_dispense_sends_tracking_commands(self): + op = Dispense( + resource=self.plate.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + volume=25.0, + flow_rate=None, + liquid_height=None, + blow_out_air_volume=None, + mix=None, + ) + await self.backend.dispense([op], use_channels=[0]) + + cmd_names = [c.kwargs.get("command", "?") for c in self.driver.send_command.call_args_list] + self.assertIn("MTR", cmd_names) + self.assertIn("SPP", cmd_names) # stop speed + + +class NumChannelsTests(PIPBackendTestBase): + def test_num_channels(self): + self.assertEqual(self.backend.num_channels, 8) + + def test_num_channels_before_setup(self): + fresh = EVOPIPBackend(driver=self.driver, deck=self.deck) + with self.assertRaises(RuntimeError): + _ = fresh.num_channels diff --git a/pylabrobot/tecan/evo/tests/roma_backend_tests.py b/pylabrobot/tecan/evo/tests/roma_backend_tests.py new file mode 100644 index 00000000000..6d7c20bbb8e --- /dev/null +++ b/pylabrobot/tecan/evo/tests/roma_backend_tests.py @@ -0,0 +1,108 @@ +"""Unit tests for EVORoMaBackend.""" + +import unittest +from unittest.mock import AsyncMock + +from pylabrobot.resources import EVO150Deck +from pylabrobot.resources.tecan.plate_carriers import MP_3Pos +from pylabrobot.resources.tecan.plates import Microplate_96_Well +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.firmware import RoMa +from pylabrobot.tecan.evo.roma_backend import EVORoMaBackend + + +class RoMaTestBase(unittest.IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + + self.driver = TecanEVODriver() + self.driver.send_command = AsyncMock() + + async def mock_send(module, command, params=None, **kwargs): + if command == "RPX": + return {"data": [9000]} + if command == "RPY": + return {"data": [2000]} + if command == "RPZ": + return {"data": [2500]} + if command == "RPR": + return {"data": [1800]} + if command == "RPG": + return {"data": [900]} + return {"data": []} + + self.driver.send_command.side_effect = mock_send + + self.deck = EVO150Deck() + self.backend = EVORoMaBackend(driver=self.driver, deck=self.deck) + self.backend.roma = RoMa(self.driver, "C1") + + self.plate_carrier = MP_3Pos(name="carrier") + self.plate_carrier[0] = self.plate = Microplate_96_Well(name="plate") + self.deck.assign_child_resource(self.plate_carrier, rails=20) + + self.driver.send_command.reset_mock() + + +class RoMaParkTests(RoMaTestBase): + async def test_park_sends_saa_aac(self): + await self.backend.park() + cmd_names = [c.kwargs.get("command", "?") for c in self.driver.send_command.call_args_list] + self.assertIn("SAA", cmd_names) + self.assertIn("AAC", cmd_names) + + +class RoMaHaltTests(RoMaTestBase): + async def test_halt_sends_bma(self): + await self.backend.halt() + self.driver.send_command.assert_called_with(module="C1", command="BMA", params=[0, 0, 0]) + + +class RoMaGripperTests(RoMaTestBase): + async def test_open_gripper(self): + await self.backend.open_gripper(gripper_width=90.0) + self.driver.send_command.assert_called_with(module="C1", command="PAG", params=[900]) + + async def test_close_gripper(self): + await self.backend.close_gripper(gripper_width=50.0) + cmd_names = [c.kwargs.get("command", "?") for c in self.driver.send_command.call_args_list] + self.assertIn("SGG", cmd_names) + self.assertIn("AGR", cmd_names) + + async def test_is_gripper_closed(self): + result = await self.backend.is_gripper_closed() + # Mock returns RPG=900, which is >= 100, so not closed + self.assertFalse(result) + + +class RoMaLocationTests(RoMaTestBase): + async def test_get_gripper_location(self): + loc = await self.backend.get_gripper_location() + self.assertEqual(loc.location.x, 900.0) # 9000 / 10 + self.assertEqual(loc.location.y, 200.0) # 2000 / 10 + self.assertEqual(loc.location.z, 250.0) # 2500 / 10 + self.assertEqual(loc.rotation.z, 180.0) # 1800 / 10 + + +class RoMaPickUpTests(RoMaTestBase): + async def test_pick_up_from_carrier_sends_trajectory(self): + await self.backend.pick_up_from_carrier(self.plate) + cmd_names = [c.kwargs.get("command", "?") for c in self.driver.send_command.call_args_list] + # Should send speed configs, SAA (vector coords), AAC (execute), SGG (gripper), AGR (grip) + self.assertIn("SFX", cmd_names) + self.assertIn("SAA", cmd_names) + self.assertIn("AAC", cmd_names) + self.assertIn("SGG", cmd_names) + self.assertIn("AGR", cmd_names) + + +class RoMaDropTests(RoMaTestBase): + async def test_drop_at_carrier_sends_trajectory(self): + dest = self.plate.get_location_wrt(self.deck) + await self.backend.drop_at_carrier(self.plate, dest) + cmd_names = [c.kwargs.get("command", "?") for c in self.driver.send_command.call_args_list] + # Multi-point trajectory: STW (target windows), SAA (vector coords), AAC (execute), PAG (open gripper) + self.assertIn("STW", cmd_names) + self.assertIn("SAA", cmd_names) + self.assertIn("AAC", cmd_names) + self.assertIn("PAG", cmd_names)