Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
e3fb232
Add AirEVOBackend for Tecan EVO with Air LiHa (ZaapMotion)
Robert-Keyser-Calico Mar 27, 2026
f843cca
Fix AirEVOBackend init: add T23SDO11,1 before PIA
Robert-Keyser-Calico Mar 27, 2026
8434517
WIP: Add init-skip to AirEVOBackend, fix labware definitions
Robert-Keyser-Calico Mar 27, 2026
e4977f2
WIP: Fix tip pickup, Y-spacing, add jog tool and tips_off
Robert-Keyser-Calico Mar 28, 2026
9b914c7
Add unit tests for AirEVOBackend (Step 5 of implementation plan)
Robert-Keyser-Calico Mar 28, 2026
7b1c48a
Add firmware feature enhancements to legacy EVO backend
Robert-Keyser-Calico Mar 30, 2026
b44ff59
Merge branch 'PyLabRobot:main' into air-liha-backend
Robert-Keyser-Calico Mar 31, 2026
493bba9
Fix lint and type issues: add STEPS_PER_UL/SPEED_FACTOR to EVOBackend…
Robert-Keyser-Calico Mar 31, 2026
2eef5be
Fix lint, mypy, and typo check issues
Robert-Keyser-Calico Mar 31, 2026
3c5ea29
Merge branch 'air-liha-backend' of https://github.com/Robert-Keyser-C…
Robert-Keyser-Calico Mar 31, 2026
3e5a5cc
Remove Tecan manuals, USB captures, and PDFs from git tracking
Robert-Keyser-Calico Mar 31, 2026
aa0274c
Fix ruff import sorting: EVO_backend after errors (case-insensitive)
Robert-Keyser-Calico Mar 31, 2026
dbb7646
Port hardware-validated fixes to legacy Air LiHa backend
Robert-Keyser-Calico Apr 9, 2026
8baad8b
Merge branch 'main' into air-liha-backend
Robert-Keyser-Calico Apr 9, 2026
f782bd8
Remove keyser-testing directory for PR
Robert-Keyser-Calico Apr 9, 2026
904615d
Merge branch 'air-liha-backend' of https://github.com/Robert-Keyser-C…
Robert-Keyser-Calico Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ pyhamilton/LAY-BACKUP
.ipynb_checkpoints
*.egg-info
*.log
keyser-testing/Tecan Manuals/
keyser-testing/evoware-usb-capture-*/
keyser-testing/evoware-pktmon-capture-*/
keyser-testing/*.pdf
build/lib

myenv
Expand Down
15 changes: 14 additions & 1 deletion _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,21 @@ mis = "mis"
RHE = "RHE"
"ASEND" = "ASEND"
caf = "caf"
# Tecan firmware command abbreviations
ALO = "ALO"
SOM = "SOM"
SHS = "SHS"
SHW = "SHW"
AZMA = "AZMA"
MCH = "MCH"
AAS = "AAS"
AER = "AER"

[files]
extend-exclude = [
"*.ipynb"
"*.ipynb",
"keyser-testing/Tecan Manuals/",
"keyser-testing/evoware-usb-capture-aspirate-dispense/",
"keyser-testing/evoware-usb-capture-multidispense/",
"keyser-testing/evoware-pktmon-capture-20260327/",
]
206 changes: 201 additions & 5 deletions pylabrobot/liquid_handling/backends/tecan/EVO_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pylabrobot.liquid_handling.standard import (
Drop,
DropTipRack,
Mix,
MultiHeadAspirationContainer,
MultiHeadAspirationPlate,
MultiHeadDispenseContainer,
Expand Down Expand Up @@ -183,6 +184,10 @@ class EVOBackend(TecanLiquidHandler):
MCA = "W1"
PNP = "W2"

# Syringe LiHa conversion factors (overridden by AirEVOBackend)
STEPS_PER_UL = 3.0
SPEED_FACTOR = 6.0

def __init__(
self,
diti_count: int = 0,
Expand Down Expand Up @@ -304,27 +309,30 @@ async def setup(self):
await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels)

async def setup_arm(self, module):
arm = EVOArm(self, module)
try:
if module == EVO.MCA:
await self.send_command(module, command="PIB")

await self.send_command(module, command="PIA")
await arm.position_init_bus()
await arm.position_init_all()
except TecanError as e:
if e.error_code == 5:
return False
raise e

if module != EVO.MCA:
await self.send_command(module, command="BMX", params=[2])
await arm.set_bus_mode(2)

return True

async def _park_liha(self):
await self.liha.set_z_travel_height([self._z_range] * self.num_channels)
await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels)

_roma_park_position = (9000, 2000, 2464, 1800)

async def _park_roma(self):
await self.roma.set_vector_coordinate_position(1, 9000, 2000, 2464, 1800, None, 1, 0)
px, py, pz, pr = self._roma_park_position
await self.roma.set_vector_coordinate_position(1, px, py, pz, pr, None, 1, 0)
await self.roma.action_move_vector_coordinate_position()

async def _park_mca(self):
Expand All @@ -347,6 +355,64 @@ async def _park_mca(self):
await self.send_command(EVO.MCA, command="BMA", params=[0, 0, 0])
await asyncio.sleep(0.5)

# ============== Mixing, blow-out, tip presence ==============

async def _perform_mix(self, mix: Mix, use_channels: List[int]) -> None:
"""Perform mix cycles at the current tip position.

Args:
mix: Mix parameters (volume, repetitions, flow_rate).
use_channels: Which channels to mix on.
"""
pvl: List[Optional[int]] = [None] * self.num_channels
sep: List[Optional[int]] = [None] * self.num_channels
ppr_asp: List[Optional[int]] = [None] * self.num_channels
ppr_disp: List[Optional[int]] = [None] * self.num_channels

for channel in use_channels:
pvl[channel] = 0 # outlet
sep[channel] = int(mix.flow_rate * self.SPEED_FACTOR)
steps = int(mix.volume * self.STEPS_PER_UL)
ppr_asp[channel] = steps
ppr_disp[channel] = -steps

await self.liha.position_valve_logical(pvl)
await self.liha.set_end_speed_plunger(sep)

for _ in range(mix.repetitions):
await self.liha.move_plunger_relative(ppr_asp)
await self.liha.move_plunger_relative(ppr_disp)

async def _perform_blow_out(
self, ops: List[SingleChannelDispense], use_channels: List[int]
) -> None:
"""Push extra air volume after dispense to expel remaining liquid."""
pvl: List[Optional[int]] = [None] * self.num_channels
sep: List[Optional[int]] = [None] * self.num_channels
ppr: List[Optional[int]] = [None] * self.num_channels
has_blowout = False

for i, channel in enumerate(use_channels):
bov = ops[i].blow_out_air_volume
if bov is not None and bov > 0:
has_blowout = True
pvl[channel] = 0
sep[channel] = int(100 * self.SPEED_FACTOR)
ppr[channel] = -int(bov * self.STEPS_PER_UL)

if has_blowout:
await self.liha.position_valve_logical(pvl)
await self.liha.set_end_speed_plunger(sep)
await self.liha.move_plunger_relative(ppr)

async def request_tip_presence(self) -> List[Optional[bool]]:
"""Query tip mounted status for each channel via RTS firmware command."""
statuses = await self.liha.read_tip_status()
result: List[Optional[bool]] = [None] * self.num_channels
for i in range(min(len(statuses), self.num_channels)):
result[i] = statuses[i]
return result

# ============== LiquidHandlerBackend methods ==============

async def aspirate(
Expand Down Expand Up @@ -411,6 +477,7 @@ async def aspirate(
assert tlc is not None
detproc = tlc.lld_mode # must be same for all channels?
sense = tlc.lld_conductivity
# Allow override via backend_kwargs (future: pass through from LiquidHandler)
await self.liha.set_detection_mode(detproc, sense)
ssl, sdl, sbl = self._liquid_detection(use_channels, tecan_liquid_classes)
await self.liha.set_search_speed(ssl)
Expand Down Expand Up @@ -440,6 +507,13 @@ async def aspirate(
await self.liha.set_end_speed_plunger(sep)
await self.liha.move_plunger_relative(ppr)

# Post-aspirate mix
mix_channels = [ch for ch, op in zip(use_channels, ops) if op.mix is not None]
if mix_channels:
mix_op = next(op for op in ops if op.mix is not None)
assert mix_op.mix is not None
await self._perform_mix(mix_op.mix, mix_channels)

async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[int]):
"""Dispense liquid from the specified channels.

Expand Down Expand Up @@ -479,6 +553,16 @@ async def dispense(self, ops: List[SingleChannelDispense], use_channels: List[in
await self.liha.set_tracking_distance_z(stz)
await self.liha.move_tracking_relative(mtr)

# Blow-out
await self._perform_blow_out(ops, use_channels)

# Post-dispense mix
mix_channels = [ch for ch, op in zip(use_channels, ops) if op.mix is not None]
if mix_channels:
mix_op = next(op for op in ops if op.mix is not None)
assert mix_op.mix is not None
await self._perform_mix(mix_op.mix, mix_channels)

async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int]):
"""Pick up tips from a resource.

Expand Down Expand Up @@ -603,6 +687,16 @@ async def pick_up_resource(self, pickup: ResourcePickup):
await self.roma.set_gripper_params(100, 75)
await self.roma.grip_plate(h - 100)

# Verify plate was gripped
try:
g_pos = await self.roma.report_g_param(0)
if g_pos >= 900:
import logging

logging.getLogger(__name__).warning("Plate may not be gripped (G-axis position: %d)", g_pos)
except (TypeError, KeyError):
pass # RPG not available or response format unexpected

async def move_picked_up_resource(self, move: ResourceMove):
raise NotImplementedError()

Expand Down Expand Up @@ -929,6 +1023,39 @@ async def report_y_param(self, param: int) -> List[int]:
)["data"]
return resp

async def read_error_register(self, param: int = 0) -> str:
"""Read error register (REE).

Args:
param: 0 = current errors, 1 = extended error info

Returns:
Error string where each character represents one axis status.
``'@'`` = no error, ``'A'`` = init failed, ``'G'`` = not initialized.
"""
resp = await self.backend.send_command(module=self.module, command="REE", params=[param])
return str(resp["data"][0]) if resp and resp.get("data") else ""

async def position_init_all(self) -> None:
"""Initialize all axes (PIA)."""
await self.backend.send_command(module=self.module, command="PIA")

async def position_init_bus(self) -> None:
"""Initialize bus (PIB). Used for MCA modules."""
await self.backend.send_command(module=self.module, command="PIB")

async def set_bus_mode(self, mode: int) -> None:
"""Set bus mode (BMX).

Args:
mode: 2 = normal operation
"""
await self.backend.send_command(module=self.module, command="BMX", params=[mode])

async def bus_module_action(self, p1: int, p2: int, p3: int) -> None:
"""Bus module action (BMA). Use ``(0, 0, 0)`` to halt all axes."""
await self.backend.send_command(module=self.module, command="BMA", params=[p1, p2, p3])


class LiHa(EVOArm):
"""
Expand Down Expand Up @@ -1180,6 +1307,75 @@ async def get_disposable_tip(self, tips, z_start, z_search):
params=[tips, z_start, z_search, 0],
)

async def position_plunger_absolute(self, positions: List[Optional[int]]) -> None:
"""Move plunger to absolute position (PPA).

Args:
positions: absolute plunger position in full steps per channel (0-3150).
0 = fully dispensed, 3150 = fully aspirated.
"""
await self.backend.send_command(module=self.module, command="PPA", params=positions)

async def set_disposable_tip_params(self, mode: int, z_discard: int, z_retract: int) -> None:
"""Set disposable tip discard parameters (SDT).

Args:
mode: 1 = discard in rack
z_discard: Z discard distance in 1/10 mm
z_retract: Z retract distance in 1/10 mm
"""
await self.backend.send_command(
module=self.module, command="SDT", params=[mode, z_discard, z_retract]
)

# ============== Query commands ==============

async def read_plunger_positions(self) -> List[int]:
"""Read current plunger positions (RPP).

Returns:
List of plunger positions in full steps per channel.
"""
resp: List[int] = (
await self.backend.send_command(module=self.module, command="RPP", params=[0])
)["data"]
return resp

async def read_z_after_liquid_detection(self) -> List[int]:
"""Read Z values after liquid detection (RVZ).

Returns:
List of Z positions in 1/10 mm where liquid was detected, per channel.
"""
resp: List[int] = (
await self.backend.send_command(module=self.module, command="RVZ", params=[0])
)["data"]
return resp

async def read_tip_status(self) -> List[bool]:
"""Read tip mounted status for each channel (RTS).

Returns:
List of booleans: True if tip is mounted on that channel.

Note:
Response format needs hardware validation.
"""
resp: List[int] = (
await self.backend.send_command(module=self.module, command="RTS", params=[0])
)["data"]
return [bool(v) for v in resp]

async def position_absolute_z_bulk(self, z: List[Optional[int]]) -> None:
"""Position absolute Z for all channels simultaneously (PAZ).

Unlike :meth:`move_absolute_z` which uses slow speed, PAZ uses fast speed.

Args:
z: absolute Z position in 1/10 mm per channel
"""
await self.backend.send_command(module=self.module, command="PAZ", params=z)

async def discard_disposable_tip_high(self, tips):
"""Drops tips
Discards at the Z-axes initialization height
Expand Down
1 change: 1 addition & 0 deletions pylabrobot/liquid_handling/backends/tecan/EVO_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ async def test_move_resource(self):
call(module="C1", command="SFR", params=[2000, 600]),
call(module="C1", command="SGG", params=[100, 75, None]),
call(module="C1", command="AGR", params=[754]),
call(module="C1", command="RPG", params=[0]),
call(module="C1", command="RPZ", params=[5]),
call(module="C1", command="STW", params=[1, 0, 0, 0, 135, 0]),
call(module="C1", command="STW", params=[2, 0, 0, 0, 53, 0]),
Expand Down
1 change: 1 addition & 0 deletions pylabrobot/liquid_handling/backends/tecan/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from .air_evo_backend import AirEVOBackend
from .EVO_backend import EVOBackend
Loading
Loading