diff --git a/.gitignore b/.gitignore index 10d2b64ea58..bc615036d54 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,10 @@ pyhamilton/LAY-BACKUP .ipynb_checkpoints *.egg-info *.log +keyser-testing/Tecan Manuals/ +keyser-testing/evoware-usb-capture-*/ +keyser-testing/evoware-pktmon-capture-*/ +keyser-testing/*.pdf build/lib myenv diff --git a/_typos.toml b/_typos.toml index 742efae9454..3cf64e87e10 100644 --- a/_typos.toml +++ b/_typos.toml @@ -28,8 +28,21 @@ mis = "mis" RHE = "RHE" "ASEND" = "ASEND" caf = "caf" +# Tecan firmware command abbreviations +ALO = "ALO" +SOM = "SOM" +SHS = "SHS" +SHW = "SHW" +AZMA = "AZMA" +MCH = "MCH" +AAS = "AAS" +AER = "AER" [files] extend-exclude = [ - "*.ipynb" + "*.ipynb", + "keyser-testing/Tecan Manuals/", + "keyser-testing/evoware-usb-capture-aspirate-dispense/", + "keyser-testing/evoware-usb-capture-multidispense/", + "keyser-testing/evoware-pktmon-capture-20260327/", ] diff --git a/pylabrobot/liquid_handling/backends/tecan/EVO_backend.py b/pylabrobot/liquid_handling/backends/tecan/EVO_backend.py index c4d9d2f2a9c..c70288eb878 100644 --- a/pylabrobot/liquid_handling/backends/tecan/EVO_backend.py +++ b/pylabrobot/liquid_handling/backends/tecan/EVO_backend.py @@ -25,6 +25,7 @@ from pylabrobot.liquid_handling.standard import ( Drop, DropTipRack, + Mix, MultiHeadAspirationContainer, MultiHeadAspirationPlate, MultiHeadDispenseContainer, @@ -183,6 +184,10 @@ class EVOBackend(TecanLiquidHandler): MCA = "W1" PNP = "W2" + # Syringe LiHa conversion factors (overridden by AirEVOBackend) + STEPS_PER_UL = 3.0 + SPEED_FACTOR = 6.0 + def __init__( self, diti_count: int = 0, @@ -304,18 +309,18 @@ async def setup(self): await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels) async def setup_arm(self, module): + arm = EVOArm(self, module) try: if module == EVO.MCA: - await self.send_command(module, command="PIB") - - await self.send_command(module, command="PIA") + await arm.position_init_bus() + await arm.position_init_all() except TecanError as e: if e.error_code == 5: return False raise e if module != EVO.MCA: - await self.send_command(module, command="BMX", params=[2]) + await arm.set_bus_mode(2) return True @@ -323,8 +328,11 @@ async def _park_liha(self): await self.liha.set_z_travel_height([self._z_range] * self.num_channels) await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels) + _roma_park_position = (9000, 2000, 2464, 1800) + async def _park_roma(self): - await self.roma.set_vector_coordinate_position(1, 9000, 2000, 2464, 1800, None, 1, 0) + px, py, pz, pr = self._roma_park_position + await self.roma.set_vector_coordinate_position(1, px, py, pz, pr, None, 1, 0) await self.roma.action_move_vector_coordinate_position() async def _park_mca(self): @@ -347,6 +355,64 @@ async def _park_mca(self): await self.send_command(EVO.MCA, command="BMA", params=[0, 0, 0]) await asyncio.sleep(0.5) + # ============== Mixing, blow-out, tip presence ============== + + async def _perform_mix(self, mix: Mix, use_channels: List[int]) -> None: + """Perform mix cycles at the current tip position. + + Args: + mix: Mix parameters (volume, repetitions, flow_rate). + use_channels: Which channels to mix on. + """ + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr_asp: List[Optional[int]] = [None] * self.num_channels + ppr_disp: List[Optional[int]] = [None] * self.num_channels + + for channel in use_channels: + pvl[channel] = 0 # outlet + sep[channel] = int(mix.flow_rate * self.SPEED_FACTOR) + steps = int(mix.volume * self.STEPS_PER_UL) + ppr_asp[channel] = steps + ppr_disp[channel] = -steps + + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + + for _ in range(mix.repetitions): + await self.liha.move_plunger_relative(ppr_asp) + await self.liha.move_plunger_relative(ppr_disp) + + async def _perform_blow_out( + self, ops: List[SingleChannelDispense], use_channels: List[int] + ) -> None: + """Push extra air volume after dispense to expel remaining liquid.""" + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + has_blowout = False + + for i, channel in enumerate(use_channels): + bov = ops[i].blow_out_air_volume + if bov is not None and bov > 0: + has_blowout = True + pvl[channel] = 0 + sep[channel] = int(100 * self.SPEED_FACTOR) + ppr[channel] = -int(bov * self.STEPS_PER_UL) + + if has_blowout: + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + async def request_tip_presence(self) -> List[Optional[bool]]: + """Query tip mounted status for each channel via RTS firmware command.""" + statuses = await self.liha.read_tip_status() + result: List[Optional[bool]] = [None] * self.num_channels + for i in range(min(len(statuses), self.num_channels)): + result[i] = statuses[i] + return result + # ============== LiquidHandlerBackend methods ============== async def aspirate( @@ -411,6 +477,7 @@ async def aspirate( assert tlc is not None detproc = tlc.lld_mode # must be same for all channels? sense = tlc.lld_conductivity + # Allow override via backend_kwargs (future: pass through from LiquidHandler) await self.liha.set_detection_mode(detproc, sense) ssl, sdl, sbl = self._liquid_detection(use_channels, tecan_liquid_classes) await self.liha.set_search_speed(ssl) @@ -440,6 +507,13 @@ async def aspirate( await self.liha.set_end_speed_plunger(sep) await self.liha.move_plunger_relative(ppr) + # Post-aspirate mix + mix_channels = [ch for ch, op in zip(use_channels, ops) if op.mix is not None] + if mix_channels: + mix_op = next(op for op in ops if op.mix is not None) + assert mix_op.mix is not None + await self._perform_mix(mix_op.mix, mix_channels) + async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int]): """Dispense liquid from the specified channels. @@ -479,6 +553,16 @@ async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[in await self.liha.set_tracking_distance_z(stz) await self.liha.move_tracking_relative(mtr) + # Blow-out + await self._perform_blow_out(ops, use_channels) + + # Post-dispense mix + mix_channels = [ch for ch, op in zip(use_channels, ops) if op.mix is not None] + if mix_channels: + mix_op = next(op for op in ops if op.mix is not None) + assert mix_op.mix is not None + await self._perform_mix(mix_op.mix, mix_channels) + async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int]): """Pick up tips from a resource. @@ -603,6 +687,16 @@ async def pick_up_resource(self, pickup: ResourcePickup): await self.roma.set_gripper_params(100, 75) await self.roma.grip_plate(h - 100) + # Verify plate was gripped + try: + g_pos = await self.roma.report_g_param(0) + if g_pos >= 900: + import logging + + logging.getLogger(__name__).warning("Plate may not be gripped (G-axis position: %d)", g_pos) + except (TypeError, KeyError): + pass # RPG not available or response format unexpected + async def move_picked_up_resource(self, move: ResourceMove): raise NotImplementedError() @@ -929,6 +1023,39 @@ async def report_y_param(self, param: int) -> List[int]: )["data"] return resp + async def read_error_register(self, param: int = 0) -> str: + """Read error register (REE). + + Args: + param: 0 = current errors, 1 = extended error info + + Returns: + Error string where each character represents one axis status. + ``'@'`` = no error, ``'A'`` = init failed, ``'G'`` = not initialized. + """ + resp = await self.backend.send_command(module=self.module, command="REE", params=[param]) + return str(resp["data"][0]) if resp and resp.get("data") else "" + + async def position_init_all(self) -> None: + """Initialize all axes (PIA).""" + await self.backend.send_command(module=self.module, command="PIA") + + async def position_init_bus(self) -> None: + """Initialize bus (PIB). Used for MCA modules.""" + await self.backend.send_command(module=self.module, command="PIB") + + async def set_bus_mode(self, mode: int) -> None: + """Set bus mode (BMX). + + Args: + mode: 2 = normal operation + """ + await self.backend.send_command(module=self.module, command="BMX", params=[mode]) + + async def bus_module_action(self, p1: int, p2: int, p3: int) -> None: + """Bus module action (BMA). Use ``(0, 0, 0)`` to halt all axes.""" + await self.backend.send_command(module=self.module, command="BMA", params=[p1, p2, p3]) + class LiHa(EVOArm): """ @@ -1180,6 +1307,75 @@ async def get_disposable_tip(self, tips, z_start, z_search): params=[tips, z_start, z_search, 0], ) + async def position_plunger_absolute(self, positions: List[Optional[int]]) -> None: + """Move plunger to absolute position (PPA). + + Args: + positions: absolute plunger position in full steps per channel (0-3150). + 0 = fully dispensed, 3150 = fully aspirated. + """ + await self.backend.send_command(module=self.module, command="PPA", params=positions) + + async def set_disposable_tip_params(self, mode: int, z_discard: int, z_retract: int) -> None: + """Set disposable tip discard parameters (SDT). + + Args: + mode: 1 = discard in rack + z_discard: Z discard distance in 1/10 mm + z_retract: Z retract distance in 1/10 mm + """ + await self.backend.send_command( + module=self.module, command="SDT", params=[mode, z_discard, z_retract] + ) + + # ============== Query commands ============== + + async def read_plunger_positions(self) -> List[int]: + """Read current plunger positions (RPP). + + Returns: + List of plunger positions in full steps per channel. + """ + resp: List[int] = ( + await self.backend.send_command(module=self.module, command="RPP", params=[0]) + )["data"] + return resp + + async def read_z_after_liquid_detection(self) -> List[int]: + """Read Z values after liquid detection (RVZ). + + Returns: + List of Z positions in 1/10 mm where liquid was detected, per channel. + """ + resp: List[int] = ( + await self.backend.send_command(module=self.module, command="RVZ", params=[0]) + )["data"] + return resp + + async def read_tip_status(self) -> List[bool]: + """Read tip mounted status for each channel (RTS). + + Returns: + List of booleans: True if tip is mounted on that channel. + + Note: + Response format needs hardware validation. + """ + resp: List[int] = ( + await self.backend.send_command(module=self.module, command="RTS", params=[0]) + )["data"] + return [bool(v) for v in resp] + + async def position_absolute_z_bulk(self, z: List[Optional[int]]) -> None: + """Position absolute Z for all channels simultaneously (PAZ). + + Unlike :meth:`move_absolute_z` which uses slow speed, PAZ uses fast speed. + + Args: + z: absolute Z position in 1/10 mm per channel + """ + await self.backend.send_command(module=self.module, command="PAZ", params=z) + async def discard_disposable_tip_high(self, tips): """Drops tips Discards at the Z-axes initialization height diff --git a/pylabrobot/liquid_handling/backends/tecan/EVO_tests.py b/pylabrobot/liquid_handling/backends/tecan/EVO_tests.py index 723aebcff44..ef050c89e03 100644 --- a/pylabrobot/liquid_handling/backends/tecan/EVO_tests.py +++ b/pylabrobot/liquid_handling/backends/tecan/EVO_tests.py @@ -429,6 +429,7 @@ async def test_move_resource(self): call(module="C1", command="SFR", params=[2000, 600]), call(module="C1", command="SGG", params=[100, 75, None]), call(module="C1", command="AGR", params=[754]), + call(module="C1", command="RPG", params=[0]), call(module="C1", command="RPZ", params=[5]), call(module="C1", command="STW", params=[1, 0, 0, 0, 135, 0]), call(module="C1", command="STW", params=[2, 0, 0, 0, 53, 0]), diff --git a/pylabrobot/liquid_handling/backends/tecan/__init__.py b/pylabrobot/liquid_handling/backends/tecan/__init__.py index de91df1008f..592597cfa0b 100644 --- a/pylabrobot/liquid_handling/backends/tecan/__init__.py +++ b/pylabrobot/liquid_handling/backends/tecan/__init__.py @@ -1 +1,2 @@ +from .air_evo_backend import AirEVOBackend from .EVO_backend import EVOBackend diff --git a/pylabrobot/liquid_handling/backends/tecan/air_evo_backend.py b/pylabrobot/liquid_handling/backends/tecan/air_evo_backend.py new file mode 100644 index 00000000000..6f44fc85e2e --- /dev/null +++ b/pylabrobot/liquid_handling/backends/tecan/air_evo_backend.py @@ -0,0 +1,714 @@ +"""Backend for Tecan Freedom EVO with Air LiHa (ZaapMotion controllers). + +The Air LiHa uses ZaapMotion BLDC motor controllers for air displacement +pipetting, as opposed to the syringe-based XP2000/XP6000 dilutors. This +requires: + +1. Boot mode exit and motor configuration on startup +2. Different plunger conversion factors (106.4 steps/uL vs 3 for syringe) +3. ZaapMotion force mode commands around each plunger operation + +See keyser-testing/AirLiHa_Investigation.md for full reverse-engineering details. +""" + +import asyncio +import logging +from typing import List, Optional, Sequence, Tuple, Union + +from pylabrobot.liquid_handling.backends.tecan.errors import TecanError +from pylabrobot.liquid_handling.backends.tecan.EVO_backend import EVOArm, EVOBackend, LiHa +from pylabrobot.liquid_handling.liquid_classes.tecan import ( + TecanLiquidClass, + get_liquid_class, +) +from pylabrobot.liquid_handling.standard import ( + Drop, + Mix, + Pickup, + SingleChannelAspiration, + SingleChannelDispense, +) +from pylabrobot.resources import Liquid, TecanTip, TipSpot, Trash + +logger = logging.getLogger(__name__) + +# ZaapMotion motor configuration sequence, sent via transparent pipeline T2x. +# Captured from EVOware USB traffic (zaapmotiondriver.dll scan phase). +# Same config for all 8 tips. +ZAAPMOTION_CONFIG = [ + "CFE 255,500", + "CAD ADCA,0,12.5", + "CAD ADCB,1,12.5", + "EDF1", + "EDF4", + "CDO 11", + "EDF5", + "SIC 10,5", + "SEA ADD,H,4,STOP,1,0,0", + "CMTBLDC,1", + "CETQEP2,256,R", + "CECPOS,QEP2", + "CECCUR,QEP2", + "CEE OFF", + "STL80", + "SVL12,8,16", + "SVL24,20,28", + "SCL1,900,3.5", + "SCE HOLD,500", + "SCE MOVE,500", + "CIR0", + "PIDHOLD,D,1.2,1,-1,0.003,0,0,OFF", + "PIDMOVE,D,0.8,1,-1,0.004,0,0,OFF", + "PIDHOLD,Q,1.2,1,-1,0.003,0,0,OFF", + "PIDMOVE,Q,0.8,1,-1,0.004,0,0,OFF", + "PIDHOLD,POS,0.2,1,-1,0.02,4,0,OFF", + "PIDMOVE,POS,0.35,1,-1,0.1,3,0,OFF", + "PIDSPDELAY,0", + "SFF 0.045,0.4,0.041", + "SES 0", + "SPO0", + "SIA 0.01, 0.28, 0.0", + "WRP", +] + + +class ZaapMotion: + """Commands for ZaapMotion motor controllers (T2x pipeline).""" + + def __init__(self, backend: EVOBackend, module: str = "C5"): + self.backend = backend + self.module = module + + def _prefix(self, tip: int) -> str: + return f"T2{tip}" + + async def exit_boot_mode(self, tip: int) -> None: + await self.backend.send_command(self.module, command=f"{self._prefix(tip)}X") + + async def read_firmware_version(self, tip: int) -> str: + resp = await self.backend.send_command(self.module, command=f"{self._prefix(tip)}RFV") + return str(resp["data"][0]) if resp and resp.get("data") else "" + + async def read_config_status(self, tip: int) -> None: + await self.backend.send_command(self.module, command=f"{self._prefix(tip)}RCS") + + async def set_force_ramp(self, tip: int, value: int) -> None: + await self.backend.send_command(self.module, command=f"{self._prefix(tip)}SFR{value}") + + async def set_force_mode(self, tip: int) -> None: + await self.backend.send_command(self.module, command=f"{self._prefix(tip)}SFP1") + + async def set_default_position(self, tip: int, value: int) -> None: + await self.backend.send_command(self.module, command=f"{self._prefix(tip)}SDP{value}") + + async def configure_motor(self, tip: int, command: str) -> None: + await self.backend.send_command(self.module, command=f"{self._prefix(tip)}{command}") + + async def set_sdo(self, param: str) -> None: + await self.backend.send_command(self.module, command=f"T23SDO{param}") + + +class AirEVOBackend(EVOBackend): + """Backend for Tecan Freedom EVO with Air LiHa (ZaapMotion controllers). + + Usage:: + + from pylabrobot.liquid_handling import LiquidHandler + from pylabrobot.liquid_handling.backends.tecan import AirEVOBackend + from pylabrobot.resources.tecan.tecan_decks import EVO150Deck + + backend = AirEVOBackend(diti_count=8) + deck = EVO150Deck() + lh = LiquidHandler(backend=backend, deck=deck) + await lh.setup() + """ + + # Air LiHa plunger conversion factors (derived from USB capture analysis). + # Syringe LiHa uses 3 steps/uL and 6 half-steps/sec per uL/s. + STEPS_PER_UL = 106.4 + SPEED_FACTOR = 213.0 + + # ZaapMotion force ramp values + SFR_ACTIVE = 133120 # high force ramp during plunger movement + SFR_IDLE = 3752 # low force ramp at rest + SDP_DEFAULT = 1400 # default dispense parameter + + def __init__( + self, + diti_count: int = 0, + packet_read_timeout: int = 30, + read_timeout: int = 120, + write_timeout: int = 120, + ): + """Create a new Air EVO interface. + + Args: + diti_count: number of channels configured for disposable tips. + packet_read_timeout: timeout in seconds for reading a single packet. + read_timeout: timeout in seconds for reading a full response. + write_timeout: timeout in seconds for writing a command. + """ + + super().__init__( + diti_count=diti_count, + packet_read_timeout=packet_read_timeout, + read_timeout=read_timeout, + write_timeout=write_timeout, + ) + self.zaap: Optional[ZaapMotion] = None + + async def setup(self): + """Setup the Air EVO. + + Checks if axes are already initialized (e.g. from a previous session). + If so, skips ZaapMotion configuration and PIA. Otherwise performs full + boot exit, motor configuration, and initialization. + """ + + # Connect USB with short packet timeout for fast buffer drain + logger.info("Connecting USB...") + saved_prt = self.io.packet_read_timeout + self.io.packet_read_timeout = 1 + await self.io.setup() + self.io.packet_read_timeout = saved_prt + + # Check if already initialized + if await self._is_initialized(): + logger.info("Axes already initialized — skipping ZaapMotion config + PIA.") + await self._setup_quick() + else: + logger.info("Axes not initialized — running full setup.") + await self._setup_full() + + async def _is_initialized(self) -> bool: + """Check if the LiHa axes are already initialized.""" + try: + arm = EVOArm(self, "C5") + err = await arm.read_error_register(0) + err = str(err) + # A = init failed (1), G = not initialized (7) — these mean we need full init + # Any other code (including @=OK, Y=tip not fetched, etc.) means axes are initialized + if err and not any(c in ("A", "G") for c in err): + return True + except (TecanError, TimeoutError): + pass + return False + + async def _setup_quick(self): + """Fast setup when axes are already initialized. Skips ZaapMotion config and PIA.""" + self._liha_connected = True + self._mca_connected = False + self._roma_connected = False + self.liha = LiHa(self, EVOBackend.LIHA) + self.zaap = ZaapMotion(self) + self._num_channels = await self.liha.report_number_tips() + self._x_range = await self.liha.report_x_param(5) + self._y_range = (await self.liha.report_y_param(5))[0] + self._z_range = (await self.liha.report_z_param(5))[0] + logger.info("Quick setup complete: %d channels, z_range=%d", self._num_channels, self._z_range) + + async def _setup_full(self): + """Full setup: ZaapMotion config, safety module, PIA, dilutor init.""" + + # Configure ZaapMotion controllers before PIA + logger.info("Configuring ZaapMotion controllers...") + await self._configure_zaapmotion() + + # Safety module: enable motor power + logger.info("Enabling safety module / motor power...") + await self._setup_safety_module() + + # ZaapMotion SDO config (from EVOware: sent right before PIA) + assert self.zaap is not None + try: + await self.zaap.set_sdo("11,1") + except TecanError: + pass # may fail if already in app mode, non-critical + + # Standard arm init (PIA, BMX, etc.) + # LiHa first, then home it to clear path before RoMa PIA + logger.info("Initializing arms (PIA)...") + self._liha_connected = await self.setup_arm(EVOBackend.LIHA) + + if self.liha_connected: + self.liha = LiHa(self, EVOBackend.LIHA) + await self.liha.position_initialization_x() + # Home LiHa to far left before RoMa PIA to prevent collision + num_ch = await self.liha.report_number_tips() + z_range = (await self.liha.report_z_param(5))[0] + await self.liha.set_z_travel_height([z_range] * num_ch) + await self.liha.position_absolute_all_axis(45, 1031, 90, [z_range] * num_ch) + logger.info("LiHa homed before RoMa PIA.") + + self._mca_connected = await self.setup_arm(EVOBackend.MCA) + self._roma_connected = await self.setup_arm(EVOBackend.ROMA) + + if self.roma_connected: + from pylabrobot.liquid_handling.backends.tecan.EVO_backend import RoMa + + self.roma = RoMa(self, EVOBackend.ROMA) + await self.roma.position_initialization_x() + await self._park_roma() + + self._num_channels = await self.liha.report_number_tips() + self._x_range = await self.liha.report_x_param(5) + self._y_range = (await self.liha.report_y_param(5))[0] + self._z_range = (await self.liha.report_z_param(5))[0] + + # Initialize dilutors (Air LiHa uses same PID/PVL/PPR sequence as syringe) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis(45, 1031, 90, [1200] * self.num_channels) + await self.liha.initialize_plunger(self._bin_use_channels(list(range(self.num_channels)))) + await self.liha.position_valve_logical([1] * self.num_channels) + await self.liha.move_plunger_relative([100] * self.num_channels) + await self.liha.position_valve_logical([0] * self.num_channels) + await self.liha.set_end_speed_plunger([1800] * self.num_channels) + await self.liha.move_plunger_relative([-100] * self.num_channels) + await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels) + + async def _configure_zaapmotion(self): + """Exit boot mode and configure all 8 ZaapMotion motor controllers. + + Each controller boots into bootloader mode (XP2-BOOT) after power cycle. + This sends the 'X' command to jump to application firmware, then sends + 33 motor configuration commands (PID gains, current limits, encoder config). + """ + zaap = ZaapMotion(self) + all_failed_tips = [] + for tip in range(8): + # Check current mode + try: + firmware = await zaap.read_firmware_version(tip) + except TecanError: + firmware = "" + + if "BOOT" in str(firmware): + logger.info("ZaapMotion tip %d in boot mode, sending exit command", tip + 1) + await zaap.exit_boot_mode(tip) + await asyncio.sleep(1) + + # Verify transition + try: + firmware = await zaap.read_firmware_version(tip) + except TecanError: + firmware = "" + + if "BOOT" in str(firmware): + raise TecanError(f"ZaapMotion tip {tip + 1} failed to exit boot mode", "C5", 1) + + # Check if already configured (RCS returns OK if configured) + try: + await zaap.read_config_status(tip) + logger.info("ZaapMotion tip %d already configured, skipping", tip + 1) + continue + except TecanError: + pass # not configured, proceed with config + + # Send motor configuration + logger.info("Configuring ZaapMotion tip %d (%d commands)", tip + 1, len(ZAAPMOTION_CONFIG)) + failures = 0 + for cmd in ZAAPMOTION_CONFIG: + try: + await zaap.configure_motor(tip, cmd) + except TecanError as e: + failures += 1 + logger.warning("ZaapMotion tip %d config command '%s' failed: %s", tip + 1, cmd, e) + + if failures == len(ZAAPMOTION_CONFIG): + all_failed_tips.append(tip + 1) + + if all_failed_tips: + raise TecanError( + f"ZaapMotion controllers not responding (tips {all_failed_tips}). " + "Power cycle the EVO and try again.", + "C5", + 5, + ) + + self.zaap = zaap + + async def _setup_safety_module(self): + """Send safety module commands to enable motor power.""" + try: + await self.send_command("O1", command="SPN") + await self.send_command("O1", command="SPS3") + except TecanError as e: + logger.warning("Safety module command failed: %s", e) + + # ============== ZaapMotion force mode helpers ============== + + async def _zaapmotion_force_on(self): + """Enable ZaapMotion force mode before plunger operations.""" + assert self.zaap is not None + for tip in range(8): + await self.zaap.set_force_ramp(tip, self.SFR_ACTIVE) + for tip in range(8): + await self.zaap.set_force_mode(tip) + + async def _zaapmotion_force_off(self): + """Restore ZaapMotion to idle after plunger operations.""" + assert self.zaap is not None + for tip in range(8): + await self.zaap.set_force_ramp(tip, self.SFR_IDLE) + for tip in range(8): + await self.zaap.set_default_position(tip, self.SDP_DEFAULT) + + # ============== Force-mode overrides for mixing and blow-out ============== + + async def _perform_mix(self, mix: Mix, use_channels: List[int]) -> None: + await self._zaapmotion_force_on() + await super()._perform_mix(mix, use_channels) + await self._zaapmotion_force_off() + + async def _perform_blow_out( + self, ops: List[SingleChannelDispense], use_channels: List[int] + ) -> None: + await self._zaapmotion_force_on() + await super()._perform_blow_out(ops, use_channels) + await self._zaapmotion_force_off() + + # ============== Override conversion factors ============== + + def _aspirate_airgap( + self, + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + airgap: str, + ) -> Tuple[List[Optional[int]], List[Optional[int]], List[Optional[int]]]: + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + pvl[channel] = 0 + if airgap == "lag": + sep[channel] = int(tlc.aspirate_lag_speed * self.SPEED_FACTOR) + ppr[channel] = int(tlc.aspirate_lag_volume * self.STEPS_PER_UL) + elif airgap == "tag": + sep[channel] = int(tlc.aspirate_tag_speed * self.SPEED_FACTOR) + ppr[channel] = int(tlc.aspirate_tag_volume * self.STEPS_PER_UL) + + return pvl, sep, ppr + + def _aspirate_action( + self, + ops: Sequence[Union[SingleChannelAspiration, SingleChannelDispense]], + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + zadd: List[Optional[int]], + ) -> Tuple[ + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + ]: + ssz: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + stz: List[Optional[int]] = [-z if z else None for z in zadd] + mtr: List[Optional[int]] = [None] * self.num_channels + ssz_r: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + z = zadd[channel] + assert tlc is not None and z is not None + flow_rate = ops[i].flow_rate or tlc.aspirate_speed + sep[channel] = int(flow_rate * self.SPEED_FACTOR) + ssz[channel] = round(z * flow_rate / ops[i].volume) + volume = tlc.compute_corrected_volume(ops[i].volume) + mtr[channel] = round(volume * self.STEPS_PER_UL) + ssz_r[channel] = int(tlc.aspirate_retract_speed * 10) + + return ssz, sep, stz, mtr, ssz_r + + def _dispense_action( + self, + ops: Sequence[Union[SingleChannelAspiration, SingleChannelDispense]], + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + ) -> Tuple[ + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + ]: + sep: List[Optional[int]] = [None] * self.num_channels + spp: List[Optional[int]] = [None] * self.num_channels + stz: List[Optional[int]] = [None] * self.num_channels + mtr: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + flow_rate = ops[i].flow_rate or tlc.dispense_speed + sep[channel] = int(flow_rate * self.SPEED_FACTOR) + spp[channel] = int(tlc.dispense_breakoff * self.SPEED_FACTOR) + stz[channel] = 0 + volume = ( + tlc.compute_corrected_volume(ops[i].volume) + + tlc.aspirate_lag_volume + + tlc.aspirate_tag_volume + ) + mtr[channel] = -round(volume * self.STEPS_PER_UL) + + return sep, spp, stz, mtr + + # ============== Override liquid handling methods ============== + + async def aspirate(self, ops: List[SingleChannelAspiration], use_channels: List[int]): + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + + tecan_liquid_classes = [ + get_liquid_class( + target_volume=op.volume, + liquid_class=Liquid.WATER, + tip_type=op.tip.tip_type, + ) + if isinstance(op.tip, TecanTip) + else None + for op in ops + ] + + from pylabrobot.resources import TecanPlate + + # Y-spacing: use the plate's well pitch (item_dy), not individual well size + plate = ops[0].resource.parent + if plate is not None and hasattr(plate, "item_dy"): + ys = int(plate.item_dy * 10) # type: ignore[union-attr] + else: + ys = int(ops[0].resource.get_absolute_size_y() * 10) + zadd: List[Optional[int]] = [0] * self.num_channels + for i, channel in enumerate(use_channels): + par = ops[i].resource.parent + if par is None: + continue + if not isinstance(par, TecanPlate): + raise ValueError(f"Operation is not supported by resource {par}.") + zadd[channel] = round(ops[i].volume / par.area * 10) # type: ignore[call-overload] + + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + # The z_positions from _liha_positions use get_z_position which adds carrier + # offset and tip_length to absolute Z values — producing out-of-range results. + # Use the plate's z_start/z_dispense directly instead. + from pylabrobot.resources import TecanPlate as _TP + + first_par = ops[0].resource.parent + if isinstance(first_par, _TP): + z_travel = [self._z_range] * self.num_channels + z_aspirate: List[Optional[int]] = [None] * self.num_channels + z_asp_max: List[Optional[int]] = [None] * self.num_channels + for i, channel in enumerate(use_channels): + tip_ext = int(ops[i].tip.total_tip_length * 10) - int(ops[i].tip.fitting_depth * 10) + z_aspirate[channel] = int(first_par.z_start) + tip_ext + z_asp_max[channel] = int(first_par.z_max) + tip_ext + else: + z_travel = [z if z else self._z_range for z in z_positions["travel"]] + z_aspirate = z_positions["start"] + + paa_y = y - yi * ys + print(f" [DEBUG] PAA: x={x}, y={paa_y}, ys={ys}, z_travel={z_travel}") + print(f" [DEBUG] z_range={self._z_range}, z_aspirate={z_aspirate}") + + await self.liha.set_z_travel_height(z_travel) + await self.liha.position_absolute_all_axis( + x, + paa_y, + ys, + z_travel, + ) + + # Aspirate leading airgap (with force mode) + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "lag") + if any(ppr): + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + # Liquid level detection + if any(tlc.aspirate_lld if tlc is not None else None for tlc in tecan_liquid_classes): + tlc, _ = self._first_valid(tecan_liquid_classes) + assert tlc is not None + detproc = tlc.lld_mode + sense = tlc.lld_conductivity + await self.liha.set_detection_mode(detproc, sense) + ssl, sdl, sbl = self._liquid_detection(use_channels, tecan_liquid_classes) + await self.liha.set_search_speed(ssl) + await self.liha.set_search_retract_distance(sdl) + await self.liha.set_search_z_start( + [z if z is not None else self._z_range for z in z_aspirate] + ) + await self.liha.set_search_z_max([z if z is not None else self._z_range for z in z_asp_max]) + await self.liha.set_search_submerge(sbl) + shz = [min(z for z in z_positions["travel"] if z)] * self.num_channels + await self.liha.set_z_travel_height(shz) + await self.liha.move_detect_liquid(self._bin_use_channels(use_channels), zadd) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + + # Aspirate + retract (with force mode) + zadd = [min(z, 32) if z else None for z in zadd] + ssz, sep, stz, mtr, ssz_r = self._aspirate_action(ops, use_channels, tecan_liquid_classes, zadd) + await self.liha.set_slow_speed_z(ssz) + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self._zaapmotion_force_off() + await self.liha.set_slow_speed_z(ssz_r) + await self.liha.move_absolute_z(z_aspirate) # retract to aspirate start height + + # Aspirate trailing airgap (with force mode) + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "tag") + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int]): + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + plate = ops[0].resource.parent + if plate is not None and hasattr(plate, "item_dy"): + ys = int(plate.item_dy * 10) # type: ignore[union-attr] + else: + ys = int(ops[0].resource.get_absolute_size_y() * 10) + + tecan_liquid_classes = [ + get_liquid_class( + target_volume=op.volume, + liquid_class=Liquid.WATER, + tip_type=op.tip.tip_type, + ) + if isinstance(op.tip, TecanTip) + else None + for op in ops + ] + + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + # Use plate z_dispense with tip extension adjustment + from pylabrobot.resources import TecanPlate as _TP + + first_par = ops[0].resource.parent + if isinstance(first_par, _TP): + z_disp: List[Optional[int]] = [None] * self.num_channels + for i, channel in enumerate(use_channels): + tip_ext = int(ops[i].tip.total_tip_length * 10) - int(ops[i].tip.fitting_depth * 10) + z_disp[channel] = int(first_par.z_dispense) + tip_ext + else: + z_disp = [z if z else self._z_range for z in z_positions["dispense"]] + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + z_disp, # type: ignore[arg-type] + ) + + sep, spp, stz, mtr = self._dispense_action(ops, use_channels, tecan_liquid_classes) + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_stop_speed_plunger(spp) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self._zaapmotion_force_off() + + async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int]): + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + # Use _liha_positions for X/Y only; Z for tip pickup is computed directly + # from the tip rack's z_start (absolute Tecan Z coordinate). + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + + ys = int(ops[0].resource.get_absolute_size_y() * 10) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + # TODO: calibrate X offset properly in resource definitions + x += 60 # temporary 6mm X offset from taught position + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, y - yi * ys, ys, [self._z_range] * self.num_channels + ) + + # Aspirate small air gap before tip pickup (with Air LiHa conversion factors) + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + for channel in use_channels: + pvl[channel] = 0 + sep[channel] = int(70 * self.SPEED_FACTOR) + ppr[channel] = int(10 * self.STEPS_PER_UL) + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + # AGT Z-start: use the tip rack's z_start directly. + # Tecan Z coordinates: 0=top (home), z_range=worktable. + # z_start is an absolute position — the force feedback handles the rest. + from pylabrobot.resources import TecanTipRack + + par = ops[0].resource.parent + assert isinstance(par, TecanTipRack), f"Expected TecanTipRack, got {type(par)}" + agt_z_start = int(par.z_start) + agt_z_search = abs(int(par.z_max - par.z_start)) + logger.info("AGT z_start=%d, z_search=%d", agt_z_start, agt_z_search) + await self.liha.get_disposable_tip( + self._bin_use_channels(use_channels), agt_z_start, agt_z_search + ) + + async def drop_tips(self, ops: List[Drop], use_channels: List[int]): + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + assert all(isinstance(op.resource, (Trash, TipSpot)) for op in ops), ( + "Must drop in waste container or tip rack" + ) + + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + + ys = int(ops[0].resource.get_absolute_size_y() * 10) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [self._z_range] * self.num_channels, + ) + + # Empty plunger before discard (from EVOware capture: PPA0 = absolute position 0) + await self.liha.position_valve_logical([0] * self.num_channels) + await self._zaapmotion_force_on() + sep_vals: List[Optional[int]] = [int(600 * self.SPEED_FACTOR)] * self.num_channels + await self.liha.set_end_speed_plunger(sep_vals) + await self.liha.position_plunger_absolute([0] * self.num_channels) + await self._zaapmotion_force_off() + + # Position at tip rack z_start and eject using mode=0 (above rack). + # Mode=1 (in rack) uses z_discard to push further down, which crashes + # on taller tip racks. Mode=0 ejects at the current Z reliably. + from pylabrobot.resources import TecanTipRack + + par = ops[0].resource.parent + assert isinstance(par, TecanTipRack), f"Expected TecanTipRack, got {type(par)}" + z_start = int(par.z_start) + await self.liha.move_absolute_z([z_start] * self.num_channels) + await self.send_command("C5", command="SDT0,50,200") + await self.liha._drop_disposable_tip(self._bin_use_channels(use_channels), discard_height=0) diff --git a/pylabrobot/liquid_handling/backends/tecan/air_evo_tests.py b/pylabrobot/liquid_handling/backends/tecan/air_evo_tests.py new file mode 100644 index 00000000000..dbb3ebe061c --- /dev/null +++ b/pylabrobot/liquid_handling/backends/tecan/air_evo_tests.py @@ -0,0 +1,291 @@ +"""Unit tests for AirEVOBackend. + +Tests conversion factors, ZaapMotion config sequence, force mode wrapping, +and init-skip logic — all with mocked USB (no hardware needed). +""" + +import unittest +import unittest.mock +from unittest.mock import AsyncMock, call + +from pylabrobot.liquid_handling.backends.tecan.air_evo_backend import ( + ZAAPMOTION_CONFIG, + AirEVOBackend, + ZaapMotion, +) +from pylabrobot.liquid_handling.backends.tecan.EVO_backend import LiHa +from pylabrobot.liquid_handling.standard import Pickup +from pylabrobot.resources import ( + Coordinate, + EVO150Deck, +) +from pylabrobot.resources.tecan.plate_carriers import MP_3Pos +from pylabrobot.resources.tecan.plates import Microplate_96_Well +from pylabrobot.resources.tecan.tip_carriers import DiTi_3Pos +from pylabrobot.resources.tecan.tip_racks import DiTi_50ul_SBS_LiHa + + +class AirEVOTestBase(unittest.IsolatedAsyncioTestCase): + """Base class with mocked AirEVOBackend setup.""" + + def setUp(self): + super().setUp() + + self.evo = AirEVOBackend(diti_count=8) + self.mock_send = AsyncMock() + self.evo.send_command = self.mock_send # type: ignore[method-assign] + + async def send_command(module, command, params=None, **kwargs): + if command == "RPX": + return {"data": [9000]} + if command == "RPY": + return {"data": [90]} + if command == "RPZ": + return {"data": [2100]} + if command == "RNT": + return {"data": [8]} + if command == "REE": + if params and params[0] == 1: + return {"data": ["XYSZZZZZZZZ"]} + return {"data": ["@@@@@@@@@@@"]} + if command.startswith("T2") and "RFV" in command: + return {"data": ["XP2000-V1.20-02/2015", "1.2.0.10946", "ZMA"]} + if command.startswith("T2") and "RCS" in command: + return {"data": []} + return {"data": []} + + self.mock_send.side_effect = send_command + + self.deck = EVO150Deck() + self.evo.set_deck(self.deck) + + self.evo.setup = AsyncMock() # type: ignore[method-assign] + self.evo._num_channels = 8 + self.evo._x_range = 9866 + self.evo._y_range = 2833 + self.evo._z_range = 2100 + self.evo._liha_connected = True + self.evo._roma_connected = False + self.evo._mca_connected = False + self.evo.liha = LiHa(self.evo, "C5") + self.evo.zaap = ZaapMotion(self.evo) + + # Deck setup + self.tip_carrier = DiTi_3Pos(name="tip_carrier") + self.tip_carrier[0] = self.tip_rack = DiTi_50ul_SBS_LiHa(name="tips") + self.deck.assign_child_resource(self.tip_carrier, rails=15) + + self.plate_carrier = MP_3Pos(name="plate_carrier") + self.plate_carrier[0] = self.plate = Microplate_96_Well(name="plate") + self.deck.assign_child_resource(self.plate_carrier, rails=25) + + self.mock_send.reset_mock() + + +class ConversionFactorTests(AirEVOTestBase): + """Test that Air LiHa uses correct conversion factors (106.4/213).""" + + def test_steps_per_ul(self): + self.assertEqual(AirEVOBackend.STEPS_PER_UL, 106.4) + + def test_speed_factor(self): + self.assertEqual(AirEVOBackend.SPEED_FACTOR, 213.0) + + def test_aspirate_airgap_uses_air_factors(self): + """Verify airgap calculation uses 106.4 steps/uL and 213 speed factor.""" + from pylabrobot.liquid_handling.liquid_classes.tecan import TecanLiquidClass + + tlc = TecanLiquidClass( + lld_mode=7, + lld_conductivity=1, + lld_speed=60, + lld_distance=4, + clot_speed=50, + clot_limit=4, + pmp_sensitivity=1, + pmp_viscosity=1, + pmp_character=0, + density=1, + calibration_factor=1.0, + calibration_offset=0, + aspirate_speed=50, + aspirate_delay=200, + aspirate_stag_volume=0, + aspirate_stag_speed=20, + aspirate_lag_volume=10, + aspirate_lag_speed=70, + aspirate_tag_volume=5, + aspirate_tag_speed=20, + aspirate_excess=0, + aspirate_conditioning=0, + aspirate_pinch_valve=False, + aspirate_lld=False, + aspirate_lld_position=3, + aspirate_lld_offset=0, + aspirate_mix=False, + aspirate_mix_volume=100, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5, + aspirate_retract_offset=-5, + dispense_speed=600, + dispense_breakoff=400, + dispense_delay=0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=7, + dispense_lld_offset=0, + dispense_touching_direction=0, + dispense_touching_speed=10, + dispense_touching_delay=100, + dispense_mix=False, + dispense_mix_volume=100, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50, + dispense_retract_offset=0, + ) + + # Leading airgap: 10uL at 70uL/s + pvl, sep, ppr = self.evo._aspirate_airgap([0], [tlc], "lag") + self.assertEqual(sep[0], int(70 * 213)) # 14910 + self.assertEqual(ppr[0], int(10 * 106.4)) # 1064 + + # Trailing airgap: 5uL at 20uL/s + pvl, sep, ppr = self.evo._aspirate_airgap([0], [tlc], "tag") + self.assertEqual(sep[0], int(20 * 213)) # 4260 + self.assertEqual(ppr[0], int(5 * 106.4)) # 532 + + +class ForceModeSFRTests(AirEVOTestBase): + """Test that force mode commands are sent correctly.""" + + def test_sfr_constants(self): + self.assertEqual(AirEVOBackend.SFR_ACTIVE, 133120) + self.assertEqual(AirEVOBackend.SFR_IDLE, 3752) + self.assertEqual(AirEVOBackend.SDP_DEFAULT, 1400) + + async def test_force_on_sends_sfr_and_sfp(self): + await self.evo._zaapmotion_force_on() + + # Should send SFR to all 8 tips, then SFP1 to all 8 tips + sfr_calls = [c for c in self.mock_send.call_args_list if "SFR133120" in str(c)] + sfp_calls = [c for c in self.mock_send.call_args_list if "SFP1" in str(c)] + self.assertEqual(len(sfr_calls), 8) + self.assertEqual(len(sfp_calls), 8) + + async def test_force_off_sends_sfr_and_sdp(self): + await self.evo._zaapmotion_force_off() + + sfr_calls = [c for c in self.mock_send.call_args_list if "SFR3752" in str(c)] + sdp_calls = [c for c in self.mock_send.call_args_list if "SDP1400" in str(c)] + self.assertEqual(len(sfr_calls), 8) + self.assertEqual(len(sdp_calls), 8) + + +class InitSkipTests(AirEVOTestBase): + """Test init-skip logic.""" + + async def test_is_initialized_all_ok(self): + """REE0 returning all '@' should mean initialized.""" + result = await self.evo._is_initialized() + self.assertTrue(result) + + async def test_is_initialized_with_init_failed(self): + """REE0 with 'A' (init failed) should mean not initialized.""" + + async def send_cmd(module, command, params=None, **kwargs): + if command == "REE": + return {"data": ["GGGAAAAAAAA"]} + return {"data": []} + + self.mock_send.side_effect = send_cmd + result = await self.evo._is_initialized() + self.assertFalse(result) + + async def test_is_initialized_with_not_initialized(self): + """REE0 with 'G' (not initialized) should mean not initialized.""" + + async def send_cmd(module, command, params=None, **kwargs): + if command == "REE": + return {"data": ["GGGGGGGGGGG"]} + return {"data": []} + + self.mock_send.side_effect = send_cmd + result = await self.evo._is_initialized() + self.assertFalse(result) + + async def test_is_initialized_with_tip_not_fetched(self): + """REE0 with 'Y' (tip not fetched) means axes ARE initialized.""" + + async def send_cmd(module, command, params=None, **kwargs): + if command == "REE": + return {"data": ["@@@YYYYYYY@"]} + return {"data": []} + + self.mock_send.side_effect = send_cmd + result = await self.evo._is_initialized() + self.assertTrue(result) + + +class ZaapMotionConfigTests(AirEVOTestBase): + """Test ZaapMotion boot exit and motor configuration.""" + + def test_config_sequence_length(self): + """Verify all 33 config commands are defined.""" + self.assertEqual(len(ZAAPMOTION_CONFIG), 33) + + def test_config_starts_with_cfe(self): + self.assertEqual(ZAAPMOTION_CONFIG[0], "CFE 255,500") + + def test_config_ends_with_wrp(self): + self.assertEqual(ZAAPMOTION_CONFIG[-1], "WRP") + + async def test_configure_skips_when_rcs_ok(self): + """If RCS returns OK, skip motor config (already configured).""" + await self.evo._configure_zaapmotion() + + # RCS returned OK for all tips, so no config commands should be sent + config_calls = [ + c + for c in self.mock_send.call_args_list + if any(cfg_cmd in str(c) for cfg_cmd in ["CFE", "CMTBLDC", "WRP"]) + ] + self.assertEqual(len(config_calls), 0) + + async def test_safety_module_sends_spn_sps3(self): + await self.evo._setup_safety_module() + + self.mock_send.assert_any_call("O1", command="SPN") + self.mock_send.assert_any_call("O1", command="SPS3") + + +class PickUpTipsAirTests(AirEVOTestBase): + """Test Air LiHa tip pickup uses correct conversion factors.""" + + async def test_tip_pickup_uses_air_speed_factor(self): + op = Pickup( + resource=self.tip_rack.get_item("A1"), + offset=Coordinate.zero(), + tip=self.tip_rack.get_tip("A1"), + ) + await self.evo.pick_up_tips([op], use_channels=[0]) + + # Check that SEP used Air LiHa speed factor (70 * 213 = 14910) + sep_calls = [ + c + for c in self.mock_send.call_args_list + if c + == call(module="C5", command="SEP", params=[14910, None, None, None, None, None, None, None]) + ] + self.assertEqual(len(sep_calls), 1) + + # Check that PPR used Air LiHa steps/uL (10 * 106.4 = 1064) + ppr_calls = [ + c + for c in self.mock_send.call_args_list + if c + == call(module="C5", command="PPR", params=[1064, None, None, None, None, None, None, None]) + ] + self.assertEqual(len(ppr_calls), 1) diff --git a/pylabrobot/liquid_handling/liquid_classes/tecan.py b/pylabrobot/liquid_handling/liquid_classes/tecan.py index 3cdc0751b2a..f817d8d8d4f 100644 --- a/pylabrobot/liquid_handling/liquid_classes/tecan.py +++ b/pylabrobot/liquid_handling/liquid_classes/tecan.py @@ -2792,3 +2792,418 @@ def get_liquid_class( dispense_retract_speed=50, dispense_retract_offset=0, ) +mapping[(1.0, 10.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=4, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.09, + calibration_offset=-0.07, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=15.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=0.1, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=0.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO free dispense DiTi 50 [1.0-10.01 uL] + +mapping[(10.01, 50.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=4, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.04, + calibration_offset=0.36, + aspirate_speed=50.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=1.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=0.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO free dispense DiTi 50 [10.01-50.01 uL] + +mapping[(0.5, 1.0, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=2, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=0.5, + calibration_offset=-0.25, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=3.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=20.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=True, + dispense_lld_position=3, + dispense_lld_offset=1.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO wet contact DiTi 50 [0.5-1.0 uL] + +mapping[(1.0, 5.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=2, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.141, + calibration_offset=-0.345, + aspirate_speed=50.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=8.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=20.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=True, + dispense_lld_position=3, + dispense_lld_offset=1.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO wet contact DiTi 50 [1.0-5.01 uL] + +mapping[(5.01, 10.01, Liquid.DMSO, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=2, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=2.14, + pmp_character=0, + density=1.1, + calibration_factor=1.043, + calibration_offset=0.214, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=5.0, + aspirate_retract_offset=-5.0, + dispense_speed=20.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=True, + dispense_lld_position=3, + dispense_lld_offset=1.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # DMSO wet contact DiTi 50 [5.01-10.01 uL] + +mapping[(0.5, 5.01, Liquid.WATER, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=1.0, + pmp_character=0, + density=1.0, + calibration_factor=1.1, + calibration_offset=0.44, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=20.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # Water free dispense DiTi 50 [0.5-5.01 uL] + +mapping[(5.01, 10.01, Liquid.WATER, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=1.0, + pmp_character=0, + density=1.0, + calibration_factor=1.085, + calibration_offset=0.65, + aspirate_speed=20.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=10.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=20.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # Water free dispense DiTi 50 [5.01-10.01 uL] + +mapping[(10.01, 50.01, Liquid.WATER, TipType.AIRDITI)] = TecanLiquidClass( + lld_mode=7, + lld_conductivity=1, + lld_speed=60.0, + lld_distance=4.0, + clot_speed=50.0, + clot_limit=4.0, + pmp_sensitivity=1, + pmp_viscosity=1.0, + pmp_character=0, + density=1.0, + calibration_factor=1.052, + calibration_offset=0.387, + aspirate_speed=70.0, + aspirate_delay=400.0, + aspirate_stag_volume=0.0, + aspirate_stag_speed=20.0, + aspirate_lag_volume=8.0, + aspirate_lag_speed=70.0, + aspirate_tag_volume=2.0, + aspirate_tag_speed=20.0, + aspirate_excess=0.0, + aspirate_conditioning=0.0, + aspirate_pinch_valve=False, + aspirate_lld=True, + aspirate_lld_position=3, + aspirate_lld_offset=2.0, + aspirate_mix=False, + aspirate_mix_volume=100.0, + aspirate_mix_cycles=1, + aspirate_retract_position=4, + aspirate_retract_speed=20.0, + aspirate_retract_offset=-5.0, + dispense_speed=600.0, + dispense_breakoff=400.0, + dispense_delay=0.0, + dispense_tag=False, + dispense_pinch_valve=False, + dispense_lld=False, + dispense_lld_position=0, + dispense_lld_offset=0.0, + dispense_touching_direction=0, + dispense_touching_speed=10.0, + dispense_touching_delay=100.0, + dispense_mix=False, + dispense_mix_volume=100.0, + dispense_mix_cycles=1, + dispense_retract_position=1, + dispense_retract_speed=50.0, + dispense_retract_offset=0.0, +) # Water free dispense DiTi 50 [10.01-50.01 uL]