diff --git a/docs/resources/library/vwr.md b/docs/resources/library/vwr.md
index 62cae2f87ce..82edec8efbb 100644
--- a/docs/resources/library/vwr.md
+++ b/docs/resources/library/vwr.md
@@ -7,6 +7,7 @@ Company page: [Wikipedia](https://en.wikipedia.org/wiki/VWR_International)
| Description | Image | PLR definition |
|--------------------|--------------------|--------------------|
| 'VWRReagentReservoirs25mL'
Part no.: 89094
[manufacturer website](https://us.vwr.com/store/product/4694822/vwr-disposable-pipetting-reservoirs)
Polystyrene Reservoirs |  | `VWRReagentReservoirs25mL` |
+| 'VWR_1_troughplate_195000uL_Ub'
Part no.: 77575-302
[manufacturer website](https://www.avantorsciences.com/us/en/product/47763965/vwr-multi-channel-polypropylene-reagent-reservoirs?isCatNumSearch=true&searchedCatalogNumber=77575-302)
Polypropylene multi-channel reagent reservoirs |  | `VWR_1_troughplate_195000uL_Ub` |
## Plates
diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
index fbecc6703a2..be9ebbc2fa7 100644
--- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
+++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
@@ -1441,7 +1441,9 @@ def core96_head_installed(self) -> bool:
@property
def num_arms(self) -> int:
- return 1 if self.extended_conf.left_x_drive.iswap_installed else 0
+ has_iswap = self.extended_conf.left_x_drive.iswap_installed
+ has_core_grippers = self._deck is not None and self._deck.has_resource("core_grippers")
+ return 1 if (has_iswap or has_core_grippers) else 0
@property
def head96_installed(self) -> Optional[bool]:
@@ -1672,6 +1674,65 @@ def _parse_firmware_version_datetime(self, fw_version: str) -> datetime.date:
raise ValueError(f"Could not parse year from firmware version string: '{fw_version}'")
return datetime.date(int(year_match.group(1)), 1, 1)
+ def _core96_has_old_firmware(self) -> bool:
+ """Return whether the installed CoRe96 firmware predates 2010 compatibility changes."""
+
+ head96_information = getattr(self, "_head96_information", None)
+ return head96_information is not None and head96_information.fw_version.year < 2010
+
+ def _pip_has_old_firmware(self) -> bool:
+ """Return whether the installed PIP firmware predates 2010 compatibility changes."""
+
+ pip_firmware_version = getattr(self, "_pip_firmware_version", None)
+ return pip_firmware_version is not None and pip_firmware_version.year < 2010
+
+ def _pip_supports_extended_tip_handling_command_params(self) -> bool:
+ """Return whether PIP tip-handling commands support newer optional parameters."""
+
+ return not self._pip_has_old_firmware()
+
+ def _pip_supports_extended_liquid_command_params(self) -> bool:
+ """Return whether PIP liquid commands support newer optional parameters."""
+
+ return not self._pip_has_old_firmware()
+
+ def _core96_supports_extended_liquid_command_params(self) -> bool:
+ """Return whether CoRe96 `EA`/`ED` support the newer optional parameter set."""
+
+ return not self._core96_has_old_firmware()
+
+ def _normalize_extended_configuration_y_bounds(self) -> None:
+ """Normalize impossible PIP Y bounds reported by some STAR/STARlet firmware revisions.
+
+ Legacy STARlet firmware has been observed to return the PIP Y bounds reversed in the
+ extended-configuration response and, on some revisions, to report an implausibly small maximum
+ Y position. Newer instruments should already report a sane interval, so normalize only when the
+ parsed values are clearly impossible.
+ """
+
+ ext_conf = self.extended_conf
+
+ if (
+ ext_conf.pip_maximal_y_position < ext_conf.left_arm_min_y_position
+ or ext_conf.pip_maximal_y_position < 100.0
+ ):
+ default_bounds = ExtendedConfiguration()
+ logger.warning(
+ "Normalizing invalid PIP Y bounds reported by firmware: max_y=%s, min_y=%s, pip_fw=%s",
+ ext_conf.pip_maximal_y_position,
+ ext_conf.left_arm_min_y_position,
+ getattr(self, "_pip_firmware_version", None),
+ )
+ ext_conf.pip_maximal_y_position = default_bounds.pip_maximal_y_position
+ ext_conf.left_arm_min_y_position = default_bounds.left_arm_min_y_position
+
+ def _pip_y_bounds(self) -> Tuple[float, float]:
+ """Return normalized PIP Y bounds as (min_y, max_y)."""
+
+ min_y = self.extended_conf.left_arm_min_y_position
+ max_y = self.extended_conf.pip_maximal_y_position
+ return (min(min_y, max_y), max(min_y, max_y))
+
async def setup(
self,
skip_instrument_initialization=False,
@@ -1696,6 +1757,7 @@ async def setup(
self._machine_conf = await self.request_machine_configuration()
self._extended_conf = await self.request_extended_configuration()
self._head96_information: Optional[Head96Information] = None
+ self._pip_firmware_version: Optional[datetime.date] = None
initialized = await self.request_instrument_initialization_status()
@@ -1715,6 +1777,15 @@ async def setup(
tip_presences = await self.request_tip_presence()
self._num_channels = len(tip_presences)
+ pip_fw_response = await self.request_firmware_version()
+ pip_fw_match = re.search(r"rf(?P.+)$", pip_fw_response)
+ if pip_fw_match is None:
+ raise ValueError(f"Could not parse PIP firmware version from response: '{pip_fw_response}'")
+ self._pip_firmware_version = self._parse_firmware_version_datetime(
+ pip_fw_match.group("fw_version").strip()
+ )
+ self._normalize_extended_configuration_y_bounds()
+
async def set_up_pip():
if (not initialized or any(tip_presences)) and not skip_pip:
await self.initialize_pip()
@@ -1841,12 +1912,13 @@ def can_reach_position(self, channel_idx: int, position: Coordinate) -> bool:
# frontmost channel can go to y=6, every channel behind it constrains its min Y
spacings = self._channels_minimum_y_spacing
- min_y_pos = self.extended_conf.left_arm_min_y_position + sum(spacings[channel_idx + 1 :])
+ lower_y_bound, upper_y_bound = self._pip_y_bounds()
+ min_y_pos = lower_y_bound + sum(spacings[channel_idx + 1 :])
if position.y < min_y_pos:
return False
# backmost channel max Y from config, every channel in front constrains its max Y
- max_y_pos = self.extended_conf.pip_maximal_y_position - sum(spacings[:channel_idx])
+ max_y_pos = upper_y_bound - sum(spacings[:channel_idx])
if position.y > max_y_pos:
return False
@@ -3457,7 +3529,7 @@ async def pick_up_tips96(
self._check_96_position_legal(pickup_position, skip_z=True)
- if tip_pickup_method == "from_rack":
+ if tip_pickup_method == "from_rack" and self._core96_supports_extended_liquid_command_params():
# the STAR will not automatically move the dispensing drive down if it is still up
# so we need to move it down here
# see https://github.com/PyLabRobot/pylabrobot/pull/835
@@ -4603,7 +4675,7 @@ async def move_channel_y(self, channel: int, y: float):
f"(channel {channel - 1} y-position is {round(y, 2)} mm)"
)
else:
- max_y_pos = self.extended_conf.pip_maximal_y_position
+ _, max_y_pos = self._pip_y_bounds()
if y > max_y_pos:
raise ValueError(f"channel {channel} y-target must be <= {max_y_pos} mm")
@@ -4616,10 +4688,9 @@ async def move_channel_y(self, channel: int, y: float):
)
else:
# STAR machines do not allow channels y < minimum
- if y < self.extended_conf.left_arm_min_y_position:
- raise ValueError(
- f"channel {channel} y-target must be >= {self.extended_conf.left_arm_min_y_position} mm"
- )
+ min_y_pos, _ = self._pip_y_bounds()
+ if y < min_y_pos:
+ raise ValueError(f"channel {channel} y-target must be >= {min_y_pos} mm")
await self.position_single_pipetting_channel_in_y_direction(
pipetting_channel_index=channel + 1, y_position=round(y * 10)
@@ -5837,7 +5908,7 @@ async def initialize_pipetting_channels(
assert 0 <= tip_type <= 99, "tip must be between 0 and 99"
assert 0 <= discarding_method <= 1, "discarding_method must be between 0 and 1"
- return await self.send_command(
+ command_kwargs: Dict[str, Any] = dict(
module="C0",
command="DI",
read_timeout=120,
@@ -5848,8 +5919,10 @@ async def initialize_pipetting_channels(
te=f"{z_position_at_end_of_a_command:04}",
tm=[f"{tm:01}" for tm in tip_pattern],
tt=f"{tip_type:02}",
- ti=discarding_method,
)
+ if self._pip_supports_extended_tip_handling_command_params():
+ command_kwargs["ti"] = discarding_method
+ return await self.send_command(**command_kwargs)
# -------------- 3.5.2 Tip handling commands using PIP --------------
@@ -5894,7 +5967,7 @@ async def pick_up_tip(
"minimum_traverse_height_at_beginning_of_a_command must be between 0 and 3600"
)
- return await self.send_command(
+ command_kwargs: Dict[str, Any] = dict(
module="C0",
command="TP",
tip_pattern=tip_pattern,
@@ -5906,8 +5979,10 @@ async def pick_up_tip(
tp=f"{begin_tip_pick_up_process:04}",
tz=f"{end_tip_pick_up_process:04}",
th=f"{minimum_traverse_height_at_beginning_of_a_command:04}",
- td=pickup_method.value,
)
+ if self._pip_supports_extended_tip_handling_command_params():
+ command_kwargs["td"] = pickup_method.value
+ return await self.send_command(**command_kwargs)
@need_iswap_parked
async def discard_tip(
@@ -5959,7 +6034,7 @@ async def discard_tip(
"z_position_at_end_of_a_command must be between 0 and 3600"
)
- return await self.send_command(
+ command_kwargs: Dict[str, Any] = dict(
module="C0",
command="TR",
tip_pattern=tip_pattern,
@@ -5971,8 +6046,10 @@ async def discard_tip(
tz=end_tip_deposit_process,
th=minimum_traverse_height_at_beginning_of_a_command,
te=z_position_at_end_of_a_command,
- ti=discarding_method.value,
)
+ if self._pip_supports_extended_tip_handling_command_params():
+ command_kwargs["ti"] = discarding_method.value
+ return await self.send_command(**command_kwargs)
# TODO:(command:TW) Tip Pick-up for DC wash procedure
@@ -6218,7 +6295,7 @@ async def aspirate_pip(
)
assert all(0 <= x <= 3600 for x in cup_upper_edge), "cup_upper_edge must be between 0 and 3600"
- return await self.send_command(
+ command_kwargs: Dict[str, Any] = dict(
module="C0",
command="AS",
tip_pattern=tip_pattern,
@@ -6232,7 +6309,10 @@ async def aspirate_pip(
lp=[f"{lp:04}" for lp in lld_search_height],
ch=[f"{ch:03}" for ch in clot_detection_height],
zl=[f"{zl:04}" for zl in liquid_surface_no_lld],
- po=[f"{po:04}" for po in pull_out_distance_transport_air],
+ )
+ if self._pip_supports_extended_liquid_command_params():
+ command_kwargs["po"] = [f"{po:04}" for po in pull_out_distance_transport_air]
+ command_kwargs.update(
zu=[f"{zu:04}" for zu in second_section_height],
zr=[f"{zr:05}" for zr in second_section_ratio],
zx=[f"{zx:04}" for zx in minimum_height],
@@ -6256,16 +6336,21 @@ async def aspirate_pip(
mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface],
ms=[f"{ms:04}" for ms in mix_speed],
mh=[f"{mh:04}" for mh in mix_surface_following_distance],
- gi=[f"{gi:03}" for gi in limit_curve_index],
- gj=tadm_algorithm,
- gk=recording_mode,
- lk=[1 if lk else 0 for lk in use_2nd_section_aspiration],
- ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip],
- sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip],
- se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search],
- sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search],
- io=[f"{io:04}" for io in cup_upper_edge],
)
+ if self._pip_supports_extended_liquid_command_params():
+ command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index]
+ command_kwargs["gj"] = tadm_algorithm
+ command_kwargs["gk"] = recording_mode
+ if self._pip_supports_extended_liquid_command_params():
+ command_kwargs.update(
+ lk=[1 if lk else 0 for lk in use_2nd_section_aspiration],
+ ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip],
+ sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip],
+ se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search],
+ sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search],
+ io=[f"{io:04}" for io in cup_upper_edge],
+ )
+ return await self.send_command(**command_kwargs)
@need_iswap_parked
async def dispense_pip(
@@ -6450,7 +6535,7 @@ async def dispense_pip(
)
assert 0 <= recording_mode <= 2, "recording_mode must be between 0 and 2"
- return await self.send_command(
+ command_kwargs: Dict[str, Any] = dict(
module="C0",
command="DS",
tip_pattern=tip_pattern,
@@ -6462,7 +6547,10 @@ async def dispense_pip(
zx=[f"{zx:04}" for zx in minimum_height],
lp=[f"{lp:04}" for lp in lld_search_height],
zl=[f"{zl:04}" for zl in liquid_surface_no_lld],
- po=[f"{po:04}" for po in pull_out_distance_transport_air],
+ )
+ if self._pip_supports_extended_liquid_command_params():
+ command_kwargs["po"] = [f"{po:04}" for po in pull_out_distance_transport_air]
+ command_kwargs.update(
ip=[f"{ip:04}" for ip in immersion_depth],
it=[f"{it:01}" for it in immersion_depth_direction],
fp=[f"{fp:04}" for fp in surface_following_distance],
@@ -6488,10 +6576,12 @@ async def dispense_pip(
mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface],
ms=[f"{ms:04}" for ms in mix_speed],
mh=[f"{mh:04}" for mh in mix_surface_following_distance],
- gi=[f"{gi:03}" for gi in limit_curve_index],
- gj=tadm_algorithm, #
- gk=recording_mode, #
)
+ if self._pip_supports_extended_liquid_command_params():
+ command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index]
+ command_kwargs["gj"] = tadm_algorithm
+ command_kwargs["gk"] = recording_mode
+ return await self.send_command(**command_kwargs)
# TODO:(command:DA) Simultaneous aspiration & dispensation of liquid
@@ -8050,7 +8140,7 @@ async def aspirate_core_96(
channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern])
channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:]
- return await self.send_command(
+ command_kwargs: Dict[str, Any] = dict(
module="C0",
command="EA",
read_timeout=max(300, self.read_timeout),
@@ -8062,32 +8152,36 @@ async def aspirate_core_96(
ze=f"{min_z_endpos:04}",
lz=f"{lld_search_height:04}",
zt=f"{liquid_surface_no_lld:04}",
- pp=f"{pull_out_distance_transport_air:04}",
- zm=f"{minimum_height:04}",
- zv=f"{second_section_height:04}",
- zq=f"{second_section_ratio:05}",
- iw=f"{immersion_depth:03}",
- ix=immersion_depth_direction,
- fh=f"{surface_following_distance:03}",
- af=f"{aspiration_volumes:05}",
- ag=f"{aspiration_speed:04}",
- vt=f"{transport_air_volume:03}",
- bv=f"{blow_out_air_volume:05}",
- wv=f"{pre_wetting_volume:05}",
- cm=lld_mode,
- cs=gamma_lld_sensitivity,
- bs=f"{swap_speed:04}",
- wh=f"{settling_time:02}",
- hv=f"{mix_volume:05}",
- hc=f"{mix_cycles:02}",
- hp=f"{mix_position_from_liquid_surface:03}",
- mj=f"{mix_surface_following_distance:03}",
- hs=f"{speed_of_mix:04}",
- cw=channel_pattern_hex,
- cr=f"{limit_curve_index:03}",
- cj=tadm_algorithm,
- cx=recording_mode,
)
+ if self._core96_supports_extended_liquid_command_params():
+ command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}"
+ command_kwargs["zm"] = f"{minimum_height:04}"
+ command_kwargs["zv"] = f"{second_section_height:04}"
+ command_kwargs["zq"] = f"{second_section_ratio:05}"
+ command_kwargs["iw"] = f"{immersion_depth:03}"
+ command_kwargs["ix"] = immersion_depth_direction
+ command_kwargs["fh"] = f"{surface_following_distance:03}"
+ command_kwargs["af"] = f"{aspiration_volumes:05}"
+ command_kwargs["ag"] = f"{aspiration_speed:04}"
+ command_kwargs["vt"] = f"{transport_air_volume:03}"
+ command_kwargs["bv"] = f"{blow_out_air_volume:05}"
+ command_kwargs["wv"] = f"{pre_wetting_volume:05}"
+ command_kwargs["cm"] = lld_mode
+ command_kwargs["cs"] = gamma_lld_sensitivity
+ command_kwargs["bs"] = f"{swap_speed:04}"
+ command_kwargs["wh"] = f"{settling_time:02}"
+ command_kwargs["hv"] = f"{mix_volume:05}"
+ command_kwargs["hc"] = f"{mix_cycles:02}"
+ command_kwargs["hp"] = f"{mix_position_from_liquid_surface:03}"
+ command_kwargs["mj"] = f"{mix_surface_following_distance:03}"
+ command_kwargs["hs"] = f"{speed_of_mix:04}"
+ if self._core96_supports_extended_liquid_command_params():
+ command_kwargs["cw"] = channel_pattern_hex
+ command_kwargs["cr"] = f"{limit_curve_index:03}"
+ command_kwargs["cj"] = tadm_algorithm
+ command_kwargs["cx"] = recording_mode
+
+ return await self.send_command(**command_kwargs)
@need_iswap_parked
@_requires_head96
@@ -8325,7 +8419,7 @@ async def dispense_core_96(
channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern])
channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:]
- return await self.send_command(
+ command_kwargs: Dict[str, Any] = dict(
module="C0",
command="ED",
read_timeout=max(300, self.read_timeout),
@@ -8338,33 +8432,37 @@ async def dispense_core_96(
zq=f"{second_section_ratio:05}",
lz=f"{lld_search_height:04}",
zt=f"{liquid_surface_no_lld:04}",
- pp=f"{pull_out_distance_transport_air:04}",
- iw=f"{immersion_depth:03}",
- ix=immersion_depth_direction,
- fh=f"{surface_following_distance:03}",
- zh=f"{minimum_traverse_height_at_beginning_of_a_command:04}",
- ze=f"{min_z_endpos:04}",
- df=f"{dispense_volume:05}",
- dg=f"{dispense_speed:04}",
- es=f"{cut_off_speed:04}",
- ev=f"{stop_back_volume:03}",
- vt=f"{transport_air_volume:03}",
- bv=f"{blow_out_air_volume:05}",
- cm=lld_mode,
- cs=gamma_lld_sensitivity,
- ej=f"{side_touch_off_distance:02}",
- bs=f"{swap_speed:04}",
- wh=f"{settling_time:02}",
- hv=f"{mixing_volume:05}",
- hc=f"{mixing_cycles:02}",
- hp=f"{mix_position_from_liquid_surface:03}",
- mj=f"{mix_surface_following_distance:03}",
- hs=f"{speed_of_mixing:04}",
- cw=channel_pattern_hex,
- cr=f"{limit_curve_index:03}",
- cj=tadm_algorithm,
- cx=recording_mode,
)
+ if self._core96_supports_extended_liquid_command_params():
+ command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}"
+ command_kwargs["iw"] = f"{immersion_depth:03}"
+ command_kwargs["ix"] = immersion_depth_direction
+ command_kwargs["fh"] = f"{surface_following_distance:03}"
+ command_kwargs["zh"] = f"{minimum_traverse_height_at_beginning_of_a_command:04}"
+ command_kwargs["ze"] = f"{min_z_endpos:04}"
+ command_kwargs["df"] = f"{dispense_volume:05}"
+ command_kwargs["dg"] = f"{dispense_speed:04}"
+ command_kwargs["es"] = f"{cut_off_speed:04}"
+ command_kwargs["ev"] = f"{stop_back_volume:03}"
+ command_kwargs["vt"] = f"{transport_air_volume:03}"
+ command_kwargs["bv"] = f"{blow_out_air_volume:05}"
+ command_kwargs["cm"] = lld_mode
+ command_kwargs["cs"] = gamma_lld_sensitivity
+ command_kwargs["ej"] = f"{side_touch_off_distance:02}"
+ command_kwargs["bs"] = f"{swap_speed:04}"
+ command_kwargs["wh"] = f"{settling_time:02}"
+ command_kwargs["hv"] = f"{mixing_volume:05}"
+ command_kwargs["hc"] = f"{mixing_cycles:02}"
+ command_kwargs["hp"] = f"{mix_position_from_liquid_surface:03}"
+ command_kwargs["mj"] = f"{mix_surface_following_distance:03}"
+ command_kwargs["hs"] = f"{speed_of_mixing:04}"
+ if self._core96_supports_extended_liquid_command_params():
+ command_kwargs["cw"] = channel_pattern_hex
+ command_kwargs["cr"] = f"{limit_curve_index:03}"
+ command_kwargs["cj"] = tadm_algorithm
+ command_kwargs["cx"] = recording_mode
+
+ return await self.send_command(**command_kwargs)
# -------------- 3.10.4 Adjustment & movement commands --------------
@@ -10696,13 +10794,13 @@ async def clld_probe_y_position_using_channel(
adj_upper_y = await self.request_y_pos_channel_n(channel_idx - 1)
max_safe_upper_y_pos = adj_upper_y - self._min_spacing_between(channel_idx, channel_idx - 1)
else:
- max_safe_upper_y_pos = self.extended_conf.pip_maximal_y_position
+ _, max_safe_upper_y_pos = self._pip_y_bounds()
if channel_idx < (self.num_channels - 1):
adj_lower_y = await self.request_y_pos_channel_n(channel_idx + 1)
max_safe_lower_y_pos = adj_lower_y + self._min_spacing_between(channel_idx, channel_idx + 1)
else:
- max_safe_lower_y_pos = self.extended_conf.left_arm_min_y_position
+ max_safe_lower_y_pos, _ = self._pip_y_bounds()
# Enable safe start and end positions
if start_pos_search:
@@ -10783,7 +10881,7 @@ async def clld_probe_y_position_using_channel(
channel_idx, channel_idx + 1
)
else:
- min_y = self.extended_conf.left_arm_min_y_position
+ min_y, _ = self._pip_y_bounds()
max_safe_dist = detected_material_y_pos - min_y
move_target = detected_material_y_pos - min(post_detection_dist, max_safe_dist)
@@ -10794,7 +10892,7 @@ async def clld_probe_y_position_using_channel(
channel_idx, channel_idx - 1
)
else:
- max_y = self.extended_conf.pip_maximal_y_position
+ _, max_y = self._pip_y_bounds()
max_safe_dist = max_y - detected_material_y_pos
move_target = detected_material_y_pos + min(post_detection_dist, max_safe_dist)
diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
index b478ff7b637..c23f8867831 100644
--- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
+++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
@@ -1,7 +1,9 @@
# mypy: disable-error-code="attr-defined,method-assign"
+import datetime
import unittest
import unittest.mock
+from dataclasses import replace
from typing import Literal, cast
from pylabrobot.liquid_handling import LiquidHandler
@@ -34,6 +36,7 @@
CommandSyntaxError,
HamiltonNoTipError,
HardwareError,
+ Head96Information,
STARBackend,
STARFirmwareError,
UnknownHamiltonError,
@@ -168,6 +171,36 @@ async def test_send_command_plaintext_response(self):
await self.star.send_command("C0", command="QM", fmt="id####")
+class TestSTARConfigurationNormalization(unittest.TestCase):
+ def test_inverted_y_bounds_are_swapped(self):
+ star = STARBackend()
+ star._pip_firmware_version = datetime.date(2024, 1, 1)
+ star._extended_conf = replace(
+ _DEFAULT_EXTENDED_CONFIGURATION,
+ pip_maximal_y_position=6.0,
+ left_arm_min_y_position=22.0,
+ )
+
+ star._normalize_extended_configuration_y_bounds()
+
+ self.assertEqual(star.extended_conf.pip_maximal_y_position, 606.5)
+ self.assertEqual(star.extended_conf.left_arm_min_y_position, 6.0)
+
+ def test_valid_y_bounds_are_kept(self):
+ star = STARBackend()
+ star._pip_firmware_version = datetime.date(2009, 1, 1)
+ star._extended_conf = replace(
+ _DEFAULT_EXTENDED_CONFIGURATION,
+ pip_maximal_y_position=606.5,
+ left_arm_min_y_position=6.0,
+ )
+
+ star._normalize_extended_configuration_y_bounds()
+
+ self.assertEqual(star.extended_conf.pip_maximal_y_position, 606.5)
+ self.assertEqual(star.extended_conf.left_arm_min_y_position, 6.0)
+
+
class STARCommandCatcher(STARBackend):
"""Mock backend for star that catches commands and saves them instead of sending them to the
machine."""
@@ -270,6 +303,18 @@ def __init__(self, name: str):
set_tip_tracking(enabled=False)
+ def _set_core96_firmware_year(self, year: int) -> None:
+ self.STAR._head96_information = Head96Information(
+ fw_version=datetime.date(year, 1, 1),
+ supports_clot_monitoring_clld=False,
+ stop_disc_type="core_i",
+ instrument_type="legacy",
+ head_type="96 head II",
+ )
+
+ def _set_pip_firmware_year(self, year: int) -> None:
+ self.STAR._pip_firmware_version = datetime.date(year, 1, 1)
+
async def test_core_read_barcode_success(self):
"""core_read_barcode_of_picked_up_resource should send ZB and return a Barcode."""
@@ -688,6 +733,24 @@ async def test_core_96_tip_pickup(self):
]
)
+ async def test_core_96_tip_pickup_skips_dispensing_drive_move_on_old_firmware(self):
+ self._set_core96_firmware_year(2009)
+
+ await self.lh.pick_up_tips96(self.tip_rack)
+
+ self.STAR._write_and_read_command.assert_has_calls(
+ [
+ _any_write_and_read_command_call("C0TTid0001tt01tf1tl0519tv03600tg2tu0"),
+ _any_write_and_read_command_call("C0EPid0002xs01179xd0yh2418tt01wu0za2164zh2450ze2450"),
+ ]
+ )
+ self.assertFalse(
+ any(
+ "H0DQ" in call.kwargs["cmd"]
+ for call in self.STAR._write_and_read_command.call_args_list # type: ignore
+ )
+ )
+
async def test_tip_tracking_pick_up96(self):
set_tip_tracking(enabled=True)
await self.lh.pick_up_tips96(self.tip_rack)
@@ -731,6 +794,24 @@ async def test_core_96_aspirate(self):
]
)
+ async def test_core_96_aspirate_old_firmware_omits_extended_params(self):
+ self._set_core96_firmware_year(2009)
+
+ await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips
+ self.STAR._write_and_read_command.reset_mock()
+
+ assert self.plate.lid is not None
+ self.plate.lid.unassign()
+ await self.lh.aspirate96(self.plate, volume=100, blow_out=True)
+
+ self.STAR._write_and_read_command.assert_has_calls(
+ [
+ _any_write_and_read_command_call(
+ "C0EAid0003aa0xs02983xd0yh1457zh2450ze2450lz1999zt1866zm1866zv0032zq06180iw000ix0fh000af01083ag2500vt050bv00000wv00050cm0cs1bs0020wh10hv00000hc00hp000mj000hs1200"
+ ),
+ ]
+ )
+
async def test_core_96_dispense(self):
await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips
if self.plate.lid is not None:
@@ -750,6 +831,26 @@ async def test_core_96_dispense(self):
]
)
+ async def test_core_96_dispense_old_firmware_omits_extended_params(self):
+ self._set_core96_firmware_year(2009)
+
+ await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips
+ if self.plate.lid is not None:
+ self.plate.lid.unassign()
+ await self.lh.aspirate96(self.plate, 100, blow_out=True)
+ self.STAR._write_and_read_command.reset_mock()
+
+ with no_volume_tracking():
+ await self.lh.dispense96(self.plate, 100, blow_out=True)
+
+ self.STAR._write_and_read_command.assert_has_calls(
+ [
+ _any_write_and_read_command_call(
+ "C0EDid0004da3xs02983xd0yh1457zm1866zv0032zq06180lz1999zt1866iw000ix0fh000zh2450ze2450df01083dg1200es0050ev000vt050bv00000cm0cs1ej00bs0020wh00hv00000hc00hp000mj000hs1200"
+ ),
+ ]
+ )
+
async def test_core_96_dispense_quadrant(self):
"""Test that each quadrant of a 384-well plate produces the correct firmware command.
@@ -1015,6 +1116,160 @@ async def test_discard_tips(self):
]
)
+ async def test_pick_up_tips_omits_pickup_method_for_old_pip_firmware(self):
+ self._set_pip_firmware_year(2009)
+
+ await self.lh.pick_up_tips(self.tip_rack["A1:H1"])
+
+ sent = self.STAR._write_and_read_command.call_args_list[-1].kwargs["cmd"]
+ self.assertTrue(sent.startswith("C0TP"))
+ self.assertNotIn("td", sent)
+
+ async def test_discard_tips_omits_discarding_method_for_old_pip_firmware(self):
+ self._set_pip_firmware_year(2009)
+ await self.lh.pick_up_tips(self.tip_rack["A1:H1"])
+ self.STAR._write_and_read_command.side_effect = [
+ "C0TRid0003kz000 000 000 000 000 000 000 000vz000 000 000 000 000 000 000 000"
+ ]
+ self.STAR._write_and_read_command.reset_mock()
+
+ await self.lh.discard_tips()
+
+ sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
+ self.assertTrue(sent.startswith("C0TR"))
+ self.assertNotIn("ti", sent)
+
+ async def test_initialize_pipetting_channels_omits_discarding_method_for_old_pip_firmware(self):
+ self._set_pip_firmware_year(2009)
+
+ await self.STAR.initialize_pipetting_channels(
+ x_positions=[8000],
+ y_positions=[3427, 3337, 3247, 3157, 3067, 2977, 2887, 2797],
+ begin_of_tip_deposit_process=2450,
+ end_of_tip_deposit_process=1870,
+ z_position_at_end_of_a_command=2450,
+ tip_pattern=[True] * 8,
+ tip_type=1,
+ discarding_method=0,
+ )
+
+ sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
+ self.assertTrue(sent.startswith("C0DI"))
+ self.assertNotIn("ti", sent)
+
+ async def test_aspirate_pip_omits_tadm_fields_for_old_pip_firmware(self):
+ self._set_pip_firmware_year(2009)
+
+ await self.STAR.aspirate_pip(
+ aspiration_type=[0],
+ tip_pattern=[True],
+ x_positions=[8000],
+ y_positions=[3427],
+ aspiration_volumes=[100],
+ limit_curve_index=[0],
+ tadm_algorithm=False,
+ recording_mode=0,
+ )
+
+ sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
+ self.assertTrue(sent.startswith("C0AS"))
+ self.assertNotIn("po", sent)
+ self.assertNotIn("gi", sent)
+ self.assertNotIn("gj", sent)
+ self.assertNotIn("gk", sent)
+ self.assertNotIn("lk", sent)
+ self.assertNotIn("ik", sent)
+ self.assertNotIn("sd", sent)
+ self.assertNotIn("se", sent)
+ self.assertNotIn("sz", sent)
+ self.assertNotIn("io", sent)
+
+ async def test_dispense_pip_omits_tadm_fields_for_old_pip_firmware(self):
+ self._set_pip_firmware_year(2009)
+
+ await self.STAR.dispense_pip(
+ tip_pattern=[True],
+ dispensing_mode=[0],
+ x_positions=[8000],
+ y_positions=[3427],
+ dispense_volumes=[100],
+ limit_curve_index=[0],
+ tadm_algorithm=False,
+ recording_mode=0,
+ )
+
+ sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
+ self.assertTrue(sent.startswith("C0DS"))
+ self.assertNotIn("po", sent)
+ self.assertNotIn("gi", sent)
+ self.assertNotIn("gj", sent)
+ self.assertNotIn("gk", sent)
+
+ async def test_aspirate_pip_includes_po_for_modern_pip_firmware(self):
+ self._set_pip_firmware_year(2024)
+
+ await self.STAR.aspirate_pip(
+ aspiration_type=[0],
+ tip_pattern=[True],
+ x_positions=[8000],
+ y_positions=[3427],
+ aspiration_volumes=[100],
+ pull_out_distance_transport_air=[100],
+ )
+
+ sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
+ self.assertTrue(sent.startswith("C0AS"))
+ self.assertIn("po0100", sent)
+
+ async def test_dispense_pip_includes_po_for_modern_pip_firmware(self):
+ self._set_pip_firmware_year(2024)
+
+ await self.STAR.dispense_pip(
+ tip_pattern=[True],
+ dispensing_mode=[0],
+ x_positions=[8000],
+ y_positions=[3427],
+ dispense_volumes=[100],
+ pull_out_distance_transport_air=[100],
+ )
+
+ sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
+ self.assertTrue(sent.startswith("C0DS"))
+ self.assertIn("po0100", sent)
+
+ async def test_ops_to_fw_positions_allows_legacy_sub_9mm_spacing(self):
+ self._set_pip_firmware_year(2009)
+
+ op1 = Pickup(
+ resource=self.tip_rack["A1"][0],
+ offset=Coordinate.zero(),
+ tip=self.tip_rack["A1"][0].get_tip(),
+ )
+ op2 = Pickup(
+ resource=self.tip_rack["A1"][0],
+ offset=Coordinate(0, -7.9, 0),
+ tip=self.tip_rack["A1"][0].get_tip(),
+ )
+
+ self.STAR._ops_to_fw_positions((op1, op2), use_channels=[0, 1])
+
+ async def test_ops_to_fw_positions_rejects_modern_sub_9mm_spacing(self):
+ self._set_pip_firmware_year(2024)
+
+ op1 = Pickup(
+ resource=self.tip_rack["A1"][0],
+ offset=Coordinate.zero(),
+ tip=self.tip_rack["A1"][0].get_tip(),
+ )
+ op2 = Pickup(
+ resource=self.tip_rack["A1"][0],
+ offset=Coordinate(0, -7.9, 0),
+ tip=self.tip_rack["A1"][0].get_tip(),
+ )
+
+ with self.assertRaisesRegex(ValueError, "Minimum distance between two y positions is <9mm"):
+ self.STAR._ops_to_fw_positions((op1, op2), use_channels=[0, 1])
+
async def test_portrait_tip_rack_handling(self):
deck = STARLetDeck()
lh = LiquidHandler(self.STAR, deck=deck)
@@ -1574,6 +1829,23 @@ async def test_can_reach_4ch_18mm_rejects_back_channel_too_far_back(self):
backend._channels_minimum_y_spacing = [18.0] * 4
self.assertFalse(backend.can_reach_position(3, Coordinate(100, 574, 100)))
+ async def test_can_reach_position_normalizes_inverted_y_bounds(self):
+ """Inverted raw Y limits should not make ordinary 8-channel STARlet positions unreachable."""
+
+ backend = STARBackend()
+ backend._num_channels = 8
+ backend._channels_minimum_y_spacing = [8.98] * 8
+ backend._extended_conf = replace(
+ _DEFAULT_EXTENDED_CONFIGURATION,
+ pip_maximal_y_position=6.0,
+ left_arm_min_y_position=22.0,
+ )
+
+ backend._normalize_extended_configuration_y_bounds()
+
+ self.assertTrue(backend.can_reach_position(0, Coordinate(100, 337.8, 100)))
+ self.assertTrue(backend.can_reach_position(7, Coordinate(100, 274.8, 100)))
+
# -- position_channels_in_y_direction: validation rejects tight positions -------
def _make_star_backend(self, num_channels, spacings):
diff --git a/pylabrobot/liquid_handling/backends/hamilton/base.py b/pylabrobot/liquid_handling/backends/hamilton/base.py
index 73ff83be6f7..dfa7657d898 100644
--- a/pylabrobot/liquid_handling/backends/hamilton/base.py
+++ b/pylabrobot/liquid_handling/backends/hamilton/base.py
@@ -416,21 +416,25 @@ def _ops_to_fw_positions(
y_pos = ops[i].resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + ops[i].offset.y
y_positions.append(round(y_pos * 10))
+ pip_has_old_firmware = getattr(self, "_pip_has_old_firmware", None)
+ enforce_min_y_spacing = not (callable(pip_has_old_firmware) and pip_has_old_firmware())
+
# check that the minimum d between any two y positions is >9mm
# O(n^2) search is not great but this is most readable, and the max size is 16, so it's fine.
- for channel_idx1, (x1, y1) in enumerate(zip(x_positions, y_positions)):
- for channel_idx2, (x2, y2) in enumerate(zip(x_positions, y_positions)):
- if channel_idx1 == channel_idx2:
- continue
- if not channels_involved[channel_idx1] or not channels_involved[channel_idx2]:
- continue
- if x1 != x2: # channels not on the same column -> will be two operations on the machine
- continue
- if y1 != y2 and abs(y1 - y2) < 90:
- raise ValueError(
- f"Minimum distance between two y positions is <9mm: {y1}, {y2}"
- f" (channel {channel_idx1} and {channel_idx2})"
- )
+ if enforce_min_y_spacing:
+ for channel_idx1, (x1, y1) in enumerate(zip(x_positions, y_positions)):
+ for channel_idx2, (x2, y2) in enumerate(zip(x_positions, y_positions)):
+ if channel_idx1 == channel_idx2:
+ continue
+ if not channels_involved[channel_idx1] or not channels_involved[channel_idx2]:
+ continue
+ if x1 != x2: # channels not on the same column -> will be two operations on the machine
+ continue
+ if y1 != y2 and abs(y1 - y2) < 90:
+ raise ValueError(
+ f"Minimum distance between two y positions is <9mm: {y1}, {y2}"
+ f" (channel {channel_idx1} and {channel_idx2})"
+ )
if len(ops) > self.num_channels:
raise ValueError(f"Too many channels specified: {len(ops)} > {self.num_channels}")
diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py
index ef194c5a3a8..fbe74f0873b 100644
--- a/pylabrobot/liquid_handling/liquid_handler.py
+++ b/pylabrobot/liquid_handling/liquid_handler.py
@@ -354,6 +354,11 @@ def _compute_spread_offsets(
spread: str,
) -> List[Coordinate]:
"""Compute channel spread offsets for a single-resource multi-channel operation."""
+ pip_has_old_firmware = getattr(self.backend, "_pip_has_old_firmware", None)
+ if spread != "custom" and callable(pip_has_old_firmware) and pip_has_old_firmware():
+ centers = list(reversed(resource.centers(yn=len(use_channels), zn=0)))
+ return [c - resource.center() for c in centers]
+
return compute_channel_offsets(
resource=resource,
num_channels=len(use_channels),
@@ -836,7 +841,7 @@ def _check_containers(self, resources: Sequence[Resource]):
@need_setup_finished
async def aspirate(
self,
- resources: Sequence[Container],
+ resources: Union[Container, Sequence[Container]],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
@@ -901,6 +906,9 @@ async def aspirate(
ValueError: If all channels are `None`.
"""
+ if isinstance(resources, Container):
+ resources = [resources]
+
self._log_command(
"aspirate",
resources=resources,
@@ -946,6 +954,8 @@ async def aspirate(
("liquid_height", liquid_height),
("blow_out_air_volume", blow_out_air_volume),
]:
+ if n == "resources" and len(p) == 1:
+ continue
if len(p) != len(use_channels):
raise ValueError(
f"Length of {n} must match length of use_channels: {len(p)} != {len(use_channels)}"
@@ -955,13 +965,18 @@ async def aspirate(
# want to space the channels evenly across the resource. Note that offsets are relative to the
# center of the resource.
if len(set(resources)) == 1:
+ original_num_resources = len(resources)
resource = resources[0]
resources = [resource] * len(use_channels)
+ pip_has_old_firmware = getattr(self.backend, "_pip_has_old_firmware", None)
+ legacy_repeated_resource_list = (
+ callable(pip_has_old_firmware) and pip_has_old_firmware() and original_num_resources > 1
+ )
+ if not legacy_repeated_resource_list:
+ center_offsets = self._compute_spread_offsets(resource, use_channels, spread)
- center_offsets = self._compute_spread_offsets(resource, use_channels, spread)
-
- # add user defined offsets to the computed centers
- offsets = [c + o for c, o in zip(center_offsets, offsets)]
+ # add user defined offsets to the computed centers
+ offsets = [c + o for c, o in zip(center_offsets, offsets)]
# create operations
aspirations = [
@@ -1029,7 +1044,7 @@ async def aspirate(
@need_setup_finished
async def dispense(
self,
- resources: Sequence[Container],
+ resources: Union[Container, Sequence[Container]],
vols: List[float],
use_channels: Optional[List[int]] = None,
flow_rates: Optional[List[Optional[float]]] = None,
@@ -1092,6 +1107,9 @@ async def dispense(
ValueError: If all channels are `None`.
"""
+ if isinstance(resources, Container):
+ resources = [resources]
+
self._log_command(
"dispense",
resources=resources,
@@ -1128,13 +1146,18 @@ async def dispense(
# want to space the channels evenly across the resource. Note that offsets are relative to the
# center of the resource.
if len(set(resources)) == 1:
+ original_num_resources = len(resources)
resource = resources[0]
resources = [resource] * len(use_channels)
+ pip_has_old_firmware = getattr(self.backend, "_pip_has_old_firmware", None)
+ legacy_repeated_resource_list = (
+ callable(pip_has_old_firmware) and pip_has_old_firmware() and original_num_resources > 1
+ )
+ if not legacy_repeated_resource_list:
+ center_offsets = self._compute_spread_offsets(resource, use_channels, spread)
- center_offsets = self._compute_spread_offsets(resource, use_channels, spread)
-
- # add user defined offsets to the computed centers
- offsets = [c + o for c, o in zip(center_offsets, offsets)]
+ # add user defined offsets to the computed centers
+ offsets = [c + o for c, o in zip(center_offsets, offsets)]
tips = [self.head[channel].get_tip() for channel in use_channels]
@@ -1159,6 +1182,8 @@ async def dispense(
("liquid_height", liquid_height),
("blow_out_air_volume", blow_out_air_volume),
]:
+ if n == "resources" and len(p) == 1:
+ continue
if len(p) != len(use_channels):
raise ValueError(
f"Length of {n} must match length of use_channels: {len(p)} != {len(use_channels)}"
diff --git a/pylabrobot/liquid_handling/liquid_handler_tests.py b/pylabrobot/liquid_handling/liquid_handler_tests.py
index d27eba719e2..e86ef030d27 100644
--- a/pylabrobot/liquid_handling/liquid_handler_tests.py
+++ b/pylabrobot/liquid_handling/liquid_handler_tests.py
@@ -29,6 +29,7 @@
ResourceNotFoundError,
ResourceStack,
TipRack,
+ VWR_1_trough_195000uL_Ub,
nest_1_troughplate_195000uL_Vb,
no_tip_tracking,
set_tip_tracking,
@@ -48,7 +49,7 @@
from pylabrobot.resources.volume_tracker import (
set_volume_tracking,
)
-from pylabrobot.resources.well import Well
+from pylabrobot.resources.well import CrossSectionType, Well, WellBottomType
from pylabrobot.serializer import serialize
from .liquid_handler import LiquidHandler
@@ -1328,3 +1329,68 @@ async def test_no_go_zones_tight_vs_wide(self):
wide_gap_lower = abs(wide_offsets[1] - wide_offsets[0])
tight_gap_lower = abs(tight_offsets[1] - tight_offsets[0])
self.assertGreater(wide_gap_lower, tight_gap_lower)
+
+ async def test_old_pip_firmware_preserves_repeated_resource_zero_offsets(self):
+ """Legacy PIP firmware should keep the old repeated-resource zero-offset behavior."""
+ self.backend._pip_has_old_firmware = lambda: True
+
+ small_well = Well(
+ name="small_well",
+ size_x=107.2,
+ size_y=70.9,
+ size_z=24.76,
+ material_z_thickness=1.0,
+ bottom_type=WellBottomType.FLAT,
+ cross_section_type=CrossSectionType.RECTANGLE,
+ )
+ self.deck.assign_child_resource(small_well, location=Coordinate(200, 100, 0))
+ small_well.tracker.set_volume(50_000)
+
+ tips = [self.tip_rack.get_item(f"{chr(65 + i)}1").get_tip() for i in range(8)]
+ self.lh.update_head_state({i: t for i, t in enumerate(tips)})
+
+ await self.lh.aspirate([small_well] * 8, vols=[100] * 8, use_channels=list(range(8)))
+
+ ops = self.backend.aspirate.call_args.kwargs["ops"]
+ offsets = [op.offset.y for op in ops]
+ self.assertEqual(offsets, [0.0] * 8)
+
+ async def test_old_pip_firmware_spreads_single_trough_resource(self):
+ """Legacy PIP firmware should still spread channels across a single trough container."""
+ self.backend._pip_has_old_firmware = lambda: True
+
+ trough = VWR_1_trough_195000uL_Ub(name="vwr_trough")
+ self.deck.assign_child_resource(trough, location=Coordinate(200, 100, 0))
+ trough.tracker.set_volume(50_000)
+
+ tips = [self.tip_rack.get_item(f"{chr(65 + i)}1").get_tip() for i in range(8)]
+ self.lh.update_head_state({i: t for i, t in enumerate(tips)})
+
+ await self.lh.aspirate(trough, vols=[100] * 8, use_channels=list(range(8)))
+
+ ops = self.backend.aspirate.call_args.kwargs["ops"]
+ offsets = [op.offset.y for op in ops]
+ self.assertNotEqual(offsets, [0.0] * 8)
+ self.assertEqual(offsets, sorted(offsets, reverse=True))
+
+ async def test_modern_firmware_still_rejects_too_small_single_resource(self):
+ """Modern spacing logic should still reject resources that cannot fit the channels."""
+ self.backend._pip_has_old_firmware = lambda: False
+
+ small_well = Well(
+ name="small_well",
+ size_x=107.2,
+ size_y=70.9,
+ size_z=24.76,
+ material_z_thickness=1.0,
+ bottom_type=WellBottomType.FLAT,
+ cross_section_type=CrossSectionType.RECTANGLE,
+ )
+ self.deck.assign_child_resource(small_well, location=Coordinate(200, 100, 0))
+ small_well.tracker.set_volume(50_000)
+
+ tips = [self.tip_rack.get_item(f"{chr(65 + i)}1").get_tip() for i in range(8)]
+ self.lh.update_head_state({i: t for i, t in enumerate(tips)})
+
+ with self.assertRaisesRegex(ValueError, "Resource is too small to space channels."):
+ await self.lh.aspirate([small_well] * 8, vols=[100] * 8, use_channels=list(range(8)))
diff --git a/pylabrobot/resources/vwr/__init__.py b/pylabrobot/resources/vwr/__init__.py
index fa3552c9330..d1f72871cd4 100644
--- a/pylabrobot/resources/vwr/__init__.py
+++ b/pylabrobot/resources/vwr/__init__.py
@@ -1,2 +1,2 @@
from .plates import VWR_1_troughplate_195000uL_Ub, VWR_96_wellplate_2mL_Vb
-from .troughs import VWRReagentReservoirs25mL
+from .troughs import VWR_1_trough_195000uL_Ub, VWRReagentReservoirs25mL
diff --git a/pylabrobot/resources/vwr/troughs.py b/pylabrobot/resources/vwr/troughs.py
index 252c9cc4348..379685f6067 100644
--- a/pylabrobot/resources/vwr/troughs.py
+++ b/pylabrobot/resources/vwr/troughs.py
@@ -1,4 +1,36 @@
-from pylabrobot.resources.trough import Trough
+from pylabrobot.resources.height_volume_functions import (
+ compute_height_from_volume_rectangle,
+ compute_volume_from_height_rectangle,
+)
+from pylabrobot.resources.trough import Trough, TroughBottomType
+
+
+def VWR_1_trough_195000uL_Ub(name: str) -> Trough:
+ """VWR NA Cat. No. 77575-302"""
+
+ inner_width = 127.76 - (14.38 - 8.9 / 2) * 2
+ inner_length = 85.48 - (11.24 - 8.9 / 2) * 2
+
+ return Trough(
+ name=name,
+ size_x=127.76, # from spec
+ size_y=85.48, # from spec
+ size_z=31.4, # from spec
+ material_z_thickness=3.55, # from spec
+ max_volume=195000, # from spec 195 mL
+ model=VWR_1_trough_195000uL_Ub.__name__,
+ bottom_type=TroughBottomType.U,
+ compute_height_from_volume=lambda liquid_volume: compute_height_from_volume_rectangle(
+ liquid_volume,
+ inner_length,
+ inner_width,
+ ),
+ compute_volume_from_height=lambda liquid_height: compute_volume_from_height_rectangle(
+ liquid_height,
+ inner_length,
+ inner_width,
+ ),
+ )
def VWRReagentReservoirs25mL(name: str) -> Trough:
diff --git a/tools/README.md b/tools/README.md
index f9121ca503a..ba2db9e6be9 100644
--- a/tools/README.md
+++ b/tools/README.md
@@ -2,3 +2,5 @@
- `make_fw`: script for converting commands from the firmware documents into Python methods.
- `make_resources`: scripts to create PyLabRobot methods for various resources.
+- `robot_loop`: local Codex hardware runner that executes generated scripts, captures structured
+ JSON results, and preserves raw logs plus firmware command traces.
diff --git a/tools/__init__.py b/tools/__init__.py
new file mode 100644
index 00000000000..5f0ec862612
--- /dev/null
+++ b/tools/__init__.py
@@ -0,0 +1 @@
+"""Local tooling modules for PyLabRobot development and hardware workflows."""
diff --git a/tools/robot_loop/__init__.py b/tools/robot_loop/__init__.py
new file mode 100644
index 00000000000..294f0583e72
--- /dev/null
+++ b/tools/robot_loop/__init__.py
@@ -0,0 +1 @@
+"""Helpers for Codex-driven local hardware iteration loops."""
diff --git a/tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py b/tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py
new file mode 100644
index 00000000000..18ba246b7ee
--- /dev/null
+++ b/tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py
@@ -0,0 +1,93 @@
+"""Notebook-matched STARlet CoRe 8 tip pickup example for the robot loop runner.
+
+This mirrors the setup in
+/home/harleyk/Documents/GrindBio_Doris_Scripts/head_96w_pick_up_tips.ipynb
+for a single hardware action:
+
+- initialize the STARlet backend
+- place the same carriers and labware on the deck
+- pick up 1000 uL filtered tips on CoRe 8 channels 0-7 from A1:H1
+
+Run with:
+
+python -m tools.robot_loop.runner \
+ --script tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py \
+ --artifact-dir /tmp/starlet-runner-artifacts \
+ --operation tips \
+ --run-id core8_pickup_1000ul_a1_h1
+"""
+
+from pylabrobot.liquid_handling import LiquidHandler
+from pylabrobot.liquid_handling.backends import STARBackend
+from pylabrobot.resources.agenbio import AGenBio_1_troughplate_190000uL_Fl
+from pylabrobot.resources.bioer import BioER_96_wellplate_Vb_2200uL
+from pylabrobot.resources.hamilton import (
+ MFX_CAR_L5_base,
+ STARLetDeck,
+ TIP_CAR_480_A00,
+ hamilton_96_tiprack_1000uL_filter,
+ hamilton_96_tiprack_300uL_filter,
+ hamilton_96_tiprack_50uL_filter,
+)
+from pylabrobot.resources.hamilton.mfx_modules import Hamilton_MFX_plateholder_DWP_metal_tapped
+
+
+async def run(context):
+ backend = STARBackend()
+ context.register_backend(backend, label="starlet")
+
+ lh = LiquidHandler(backend=backend, deck=STARLetDeck())
+
+ plateholder_0 = Hamilton_MFX_plateholder_DWP_metal_tapped(name="plateholder_0")
+ plateholder_1 = Hamilton_MFX_plateholder_DWP_metal_tapped(name="plateholder_1")
+ plateholder_2 = Hamilton_MFX_plateholder_DWP_metal_tapped(name="plateholder_2")
+ plate_carrier = MFX_CAR_L5_base(
+ name="plate_modules",
+ modules={
+ 0: plateholder_0,
+ 1: plateholder_1,
+ 2: plateholder_2,
+ },
+ )
+
+ tip_carrier = TIP_CAR_480_A00(name="tip_carrier_480")
+
+ water_plate = AGenBio_1_troughplate_190000uL_Fl(name="water_plate")
+ dw_plate = BioER_96_wellplate_Vb_2200uL(name="dw_plate")
+ hi_orange = AGenBio_1_troughplate_190000uL_Fl(name="hiOrange")
+
+ tip_rack_1000 = hamilton_96_tiprack_1000uL_filter(name="tiprack_1000ul_filter")
+ tip_rack_50 = hamilton_96_tiprack_50uL_filter(name="tiprack_50ul_filter")
+ tip_rack_300 = hamilton_96_tiprack_300uL_filter(name="tiprack_300ul_filter")
+
+ plate_carrier[0] = dw_plate
+ plate_carrier[1] = water_plate
+ plate_carrier[2] = hi_orange
+
+ tip_carrier[2] = tip_rack_1000
+ tip_carrier[1] = tip_rack_50
+ tip_carrier[0] = tip_rack_300
+
+ lh.deck.assign_child_resource(tip_carrier, rails=13)
+ lh.deck.assign_child_resource(plate_carrier, rails=7)
+
+ try:
+ await lh.setup(skip_autoload=True)
+ context.add_note("setup complete")
+ context.add_note("attempting CoRe 8 pickup from 1000 uL rack slot 2 on TIP_CAR_480_A00")
+
+ await lh.pick_up_tips(
+ tip_rack_1000["A1:H1"],
+ use_channels=list(range(8)),
+ )
+
+ context.add_note("pick_up_tips completed")
+ except Exception:
+ try:
+ faulty = await backend.request_name_of_last_faulty_parameter()
+ context.write_json_artifact("last_faulty_parameter.json", faulty)
+ except Exception as inner_exc:
+ context.add_note(f"could not query last faulty parameter: {inner_exc}")
+ raise
+ finally:
+ await lh.stop()
diff --git a/tools/robot_loop/examples/starlet_firmware_probe.py b/tools/robot_loop/examples/starlet_firmware_probe.py
new file mode 100644
index 00000000000..a7a3fedfe8b
--- /dev/null
+++ b/tools/robot_loop/examples/starlet_firmware_probe.py
@@ -0,0 +1,20 @@
+"""Minimal STARlet probe for the local robot loop runner.
+
+This is intended as a safe first hardware script: setup, capture firmware context, and stop.
+"""
+
+from pylabrobot.liquid_handling import LiquidHandler
+from pylabrobot.liquid_handling.backends.hamilton.STAR_backend import STARBackend
+from pylabrobot.resources import STARLetDeck
+
+
+async def run(context):
+ backend = STARBackend()
+ context.register_backend(backend, label="starlet")
+
+ lh = LiquidHandler(backend=backend, deck=STARLetDeck())
+ try:
+ await lh.setup()
+ context.add_note("setup completed")
+ finally:
+ await lh.stop()
diff --git a/tools/robot_loop/runner.py b/tools/robot_loop/runner.py
new file mode 100644
index 00000000000..dd7ea8a4a57
--- /dev/null
+++ b/tools/robot_loop/runner.py
@@ -0,0 +1,453 @@
+import argparse
+import asyncio
+import contextlib
+import dataclasses
+import datetime
+import importlib.util
+import inspect
+import io
+import json
+import pathlib
+import sys
+import traceback
+import types
+import typing
+import uuid
+
+
+def _utcnow() -> datetime.datetime:
+ return datetime.datetime.now(datetime.timezone.utc)
+
+
+def _to_jsonable(value: typing.Any) -> typing.Any:
+ if dataclasses.is_dataclass(value):
+ return _to_jsonable(dataclasses.asdict(value))
+ if isinstance(value, pathlib.Path):
+ return str(value)
+ if isinstance(value, (datetime.datetime, datetime.date, datetime.time)):
+ return value.isoformat()
+ if isinstance(value, dict):
+ return {str(k): _to_jsonable(v) for k, v in value.items()}
+ if isinstance(value, (list, tuple, set)):
+ return [_to_jsonable(v) for v in value]
+ if isinstance(value, (str, int, float, bool)) or value is None:
+ return value
+ return repr(value)
+
+
+@dataclasses.dataclass
+class FirmwareCommandRecord:
+ timestamp: str
+ backend_label: str
+ module: str
+ command: str
+ kwargs: dict
+ response: typing.Any = None
+ error_type: typing.Optional[str] = None
+ error_message: typing.Optional[str] = None
+
+ def to_dict(self) -> dict:
+ return _to_jsonable(dataclasses.asdict(self))
+
+
+@dataclasses.dataclass
+class RobotRunResult:
+ run_id: str
+ status: str
+ script_path: str
+ operation: str
+ started_at: str
+ finished_at: str
+ timeout_seconds: float
+ exception_type: typing.Optional[str] = None
+ exception_message: typing.Optional[str] = None
+ traceback: typing.Optional[str] = None
+ notes: typing.Optional[list] = None
+ metadata: typing.Optional[dict] = None
+ firmware_context: typing.Optional[list] = None
+ artifacts: typing.Optional[dict] = None
+ cleanup_error: typing.Optional[str] = None
+
+ def to_dict(self) -> dict:
+ return _to_jsonable(dataclasses.asdict(self))
+
+
+@dataclasses.dataclass
+class RobotRunJob:
+ script: pathlib.Path
+ result_json: pathlib.Path
+ raw_log: pathlib.Path
+ command_log_jsonl: pathlib.Path
+ artifact_dir: pathlib.Path
+ operation: str = "unspecified"
+ timeout_seconds: float = 300.0
+ cleanup_timeout_seconds: float = 30.0
+ run_id: str = dataclasses.field(default_factory=lambda: uuid.uuid4().hex)
+ metadata: dict = dataclasses.field(default_factory=dict)
+
+
+class _TeeTextIO(io.TextIOBase):
+ def __init__(self, *targets: typing.TextIO):
+ self.targets = targets
+
+ def write(self, data: str) -> int:
+ for target in self.targets:
+ target.write(data)
+ target.flush()
+ return len(data)
+
+ def flush(self) -> None:
+ for target in self.targets:
+ target.flush()
+
+
+class RobotJobContext:
+ def __init__(
+ self,
+ *,
+ run_id: str,
+ script_path: pathlib.Path,
+ artifact_dir: pathlib.Path,
+ command_log_jsonl: pathlib.Path,
+ operation: str,
+ metadata: typing.Optional[dict] = None,
+ ):
+ self.run_id = run_id
+ self.script_path = script_path
+ self.artifact_dir = artifact_dir
+ self.command_log_jsonl = command_log_jsonl
+ self.operation = operation
+ self.metadata = metadata or {}
+ self.notes: list[str] = []
+ self._registered_backends: list[dict] = []
+
+ def add_note(self, message: str) -> None:
+ self.notes.append(message)
+
+ def write_json_artifact(self, relative_path: str, data: typing.Any) -> pathlib.Path:
+ target = self.artifact_dir / relative_path
+ target.parent.mkdir(parents=True, exist_ok=True)
+ target.write_text(json.dumps(_to_jsonable(data), indent=2, sort_keys=True), encoding="utf-8")
+ return target
+
+ def register_backend(
+ self,
+ backend: typing.Any,
+ *,
+ label: typing.Optional[str] = None,
+ capture_commands: bool = True,
+ ) -> typing.Any:
+ backend_label = label or f"{type(backend).__name__}_{len(self._registered_backends)}"
+ registration = {
+ "label": backend_label,
+ "backend": backend,
+ "original_send_command": getattr(backend, "send_command", None),
+ "capture_commands": capture_commands,
+ }
+ self._registered_backends.append(registration)
+
+ if capture_commands and registration["original_send_command"] is not None:
+ original_send_command = registration["original_send_command"]
+
+ async def wrapped_send_command(instance, *args, **kwargs):
+ module = kwargs.get("module")
+ command = kwargs.get("command")
+ if len(args) >= 1:
+ module = args[0]
+ if len(args) >= 2:
+ command = args[1]
+
+ command_kwargs = {
+ key: value
+ for key, value in kwargs.items()
+ if key not in {"module", "command"}
+ }
+
+ record = FirmwareCommandRecord(
+ timestamp=_utcnow().isoformat(),
+ backend_label=backend_label,
+ module=str(module),
+ command=str(command),
+ kwargs=_to_jsonable(command_kwargs),
+ )
+ try:
+ response = await original_send_command(*args, **kwargs)
+ record.response = _to_jsonable(response)
+ return response
+ except Exception as exc:
+ record.error_type = type(exc).__name__
+ record.error_message = str(exc)
+ raise
+ finally:
+ self._append_command_record(record)
+
+ backend.send_command = types.MethodType(wrapped_send_command, backend)
+
+ return backend
+
+ def _append_command_record(self, record: FirmwareCommandRecord) -> None:
+ self.command_log_jsonl.parent.mkdir(parents=True, exist_ok=True)
+ with self.command_log_jsonl.open("a", encoding="utf-8") as handle:
+ handle.write(json.dumps(record.to_dict(), sort_keys=True))
+ handle.write("\n")
+
+ def collect_firmware_context(self) -> list[dict]:
+ contexts = []
+ for registration in self._registered_backends:
+ backend = registration["backend"]
+ head96_information = getattr(backend, "_head96_information", None)
+ contexts.append(
+ {
+ "label": registration["label"],
+ "backend_type": type(backend).__name__,
+ "pip_firmware_version": _to_jsonable(getattr(backend, "_pip_firmware_version", None)),
+ "head96_information": _to_jsonable(head96_information),
+ "machine_conf": _to_jsonable(getattr(backend, "_machine_conf", None)),
+ "extended_conf": _to_jsonable(getattr(backend, "_extended_conf", None)),
+ }
+ )
+ return contexts
+
+ async def best_effort_stop_backends(self) -> None:
+ for registration in reversed(self._registered_backends):
+ backend = registration["backend"]
+ stop_steps = [
+ ("move_all_channels_in_z_safety", None),
+ ("move_core_96_to_safe_position", None),
+ ("park_iswap", None),
+ ("park_autoload", None),
+ ("stop", None),
+ ]
+ for method_name, argument in stop_steps:
+ method = getattr(backend, method_name, None)
+ if method is None:
+ continue
+ try:
+ result = method() if argument is None else method(argument)
+ if inspect.isawaitable(result):
+ await result
+ except Exception as exc: # pragma: no cover - cleanup is intentionally best effort
+ self.add_note(f"cleanup step {method_name} failed on {registration['label']}: {exc}")
+
+ def restore_backend_hooks(self) -> None:
+ for registration in self._registered_backends:
+ original_send_command = registration["original_send_command"]
+ if registration["capture_commands"] and original_send_command is not None:
+ registration["backend"].send_command = original_send_command
+
+
+def _load_script_module(script_path: pathlib.Path):
+ module_name = f"robot_loop_script_{uuid.uuid4().hex}"
+ spec = importlib.util.spec_from_file_location(module_name, script_path)
+ if spec is None or spec.loader is None:
+ raise RuntimeError(f"Unable to import runner script from {script_path}")
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+async def _invoke_callable(func: typing.Callable, context: RobotJobContext):
+ signature = inspect.signature(func)
+ if len(signature.parameters) == 0:
+ result = func()
+ else:
+ result = func(context)
+ if inspect.isawaitable(result):
+ return await result
+ return result
+
+
+async def execute_job(job: RobotRunJob) -> RobotRunResult:
+ job.artifact_dir.mkdir(parents=True, exist_ok=True)
+ job.result_json.parent.mkdir(parents=True, exist_ok=True)
+ job.raw_log.parent.mkdir(parents=True, exist_ok=True)
+ job.command_log_jsonl.parent.mkdir(parents=True, exist_ok=True)
+
+ context = RobotJobContext(
+ run_id=job.run_id,
+ script_path=job.script,
+ artifact_dir=job.artifact_dir,
+ command_log_jsonl=job.command_log_jsonl,
+ operation=job.operation,
+ metadata=job.metadata,
+ )
+ started_at = _utcnow()
+ status = "success"
+ exception_type = None
+ exception_message = None
+ traceback_text = None
+ cleanup_error = None
+
+ with job.raw_log.open("a", encoding="utf-8") as raw_log_handle:
+ tee_stdout = _TeeTextIO(sys.stdout, raw_log_handle)
+ tee_stderr = _TeeTextIO(sys.stderr, raw_log_handle)
+ with contextlib.redirect_stdout(tee_stdout), contextlib.redirect_stderr(tee_stderr):
+ print(f"[robot-loop] run_id={job.run_id} operation={job.operation} script={job.script}")
+ cleanup_callable = None
+
+ try:
+ module = _load_script_module(job.script)
+ run_callable = getattr(module, "run", None)
+ cleanup_callable = getattr(module, "cleanup", None)
+ if run_callable is None:
+ raise RuntimeError(f"Runner script {job.script} must define a callable named 'run'.")
+ await asyncio.wait_for(_invoke_callable(run_callable, context), timeout=job.timeout_seconds)
+ except asyncio.TimeoutError as exc:
+ status = "timeout"
+ exception_type = type(exc).__name__
+ exception_message = f"runner timed out after {job.timeout_seconds} seconds"
+ traceback_text = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
+ except Exception as exc: # pylint: disable=broad-except
+ status = "failure"
+ exception_type = type(exc).__name__
+ exception_message = str(exc)
+ traceback_text = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
+ finally:
+ try:
+ if cleanup_callable is not None:
+ await asyncio.wait_for(
+ _invoke_callable(cleanup_callable, context), timeout=job.cleanup_timeout_seconds
+ )
+ except Exception as exc: # pylint: disable=broad-except
+ cleanup_error = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
+ try:
+ await asyncio.wait_for(
+ context.best_effort_stop_backends(), timeout=job.cleanup_timeout_seconds
+ )
+ except Exception as exc: # pylint: disable=broad-except
+ suffix = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
+ cleanup_error = suffix if cleanup_error is None else cleanup_error + "\n" + suffix
+ finally:
+ context.restore_backend_hooks()
+
+ finished_at = _utcnow()
+ result = RobotRunResult(
+ run_id=job.run_id,
+ status=status,
+ script_path=str(job.script),
+ operation=job.operation,
+ started_at=started_at.isoformat(),
+ finished_at=finished_at.isoformat(),
+ timeout_seconds=job.timeout_seconds,
+ exception_type=exception_type,
+ exception_message=exception_message,
+ traceback=traceback_text,
+ notes=context.notes,
+ metadata=job.metadata,
+ firmware_context=context.collect_firmware_context(),
+ cleanup_error=cleanup_error,
+ artifacts={
+ "artifact_dir": str(job.artifact_dir),
+ "result_json": str(job.result_json),
+ "raw_log": str(job.raw_log),
+ "command_log_jsonl": str(job.command_log_jsonl),
+ },
+ )
+ job.result_json.write_text(json.dumps(result.to_dict(), indent=2, sort_keys=True), encoding="utf-8")
+ return result
+
+
+def load_job(
+ *,
+ script: typing.Optional[str] = None,
+ result_json: typing.Optional[str] = None,
+ raw_log: typing.Optional[str] = None,
+ command_log_jsonl: typing.Optional[str] = None,
+ artifact_dir: typing.Optional[str] = None,
+ operation: str = "unspecified",
+ timeout_seconds: float = 300.0,
+ cleanup_timeout_seconds: float = 30.0,
+ run_id: typing.Optional[str] = None,
+ metadata: typing.Optional[dict] = None,
+ job_json: typing.Optional[str] = None,
+) -> RobotRunJob:
+ payload = {}
+ if job_json is not None:
+ payload = json.loads(pathlib.Path(job_json).read_text(encoding="utf-8"))
+
+ script_path = pathlib.Path(script or payload["script"]).resolve()
+ artifact_root = pathlib.Path(
+ artifact_dir or payload.get("artifact_dir") or script_path.parent / "robot_loop_artifacts"
+ ).resolve()
+ resolved_run_id = run_id or payload.get("run_id") or uuid.uuid4().hex
+
+ result_path = pathlib.Path(
+ result_json or payload.get("result_json") or artifact_root / f"{resolved_run_id}_result.json"
+ ).resolve()
+ raw_log_path = pathlib.Path(
+ raw_log or payload.get("raw_log") or artifact_root / f"{resolved_run_id}_raw.log"
+ ).resolve()
+ command_log_path = pathlib.Path(
+ command_log_jsonl
+ or payload.get("command_log_jsonl")
+ or artifact_root / f"{resolved_run_id}_commands.jsonl"
+ ).resolve()
+
+ combined_metadata = {}
+ combined_metadata.update(payload.get("metadata", {}))
+ if metadata:
+ combined_metadata.update(metadata)
+
+ return RobotRunJob(
+ script=script_path,
+ result_json=result_path,
+ raw_log=raw_log_path,
+ command_log_jsonl=command_log_path,
+ artifact_dir=artifact_root,
+ operation=payload.get("operation", operation),
+ timeout_seconds=float(payload.get("timeout_seconds", timeout_seconds)),
+ cleanup_timeout_seconds=float(
+ payload.get("cleanup_timeout_seconds", cleanup_timeout_seconds)
+ ),
+ run_id=resolved_run_id,
+ metadata=combined_metadata,
+ )
+
+
+def _parse_metadata(args_metadata: typing.Optional[str]) -> dict:
+ if args_metadata is None:
+ return {}
+ return typing.cast(dict, json.loads(args_metadata))
+
+
+def main(argv: typing.Optional[list[str]] = None) -> int:
+ parser = argparse.ArgumentParser(description="Run a local Codex hardware iteration job.")
+ parser.add_argument("--script", help="Path to the generated Python runner script.")
+ parser.add_argument("--job-json", help="Optional path to a JSON job specification.")
+ parser.add_argument("--result-json", help="Path where the structured result JSON should be written.")
+ parser.add_argument("--raw-log", help="Path where the raw stdout/stderr log should be written.")
+ parser.add_argument(
+ "--command-log-jsonl",
+ help="Path where captured backend firmware command records should be written.",
+ )
+ parser.add_argument("--artifact-dir", help="Directory for run artifacts.")
+ parser.add_argument("--operation", default="unspecified", help="Operation category for this run.")
+ parser.add_argument("--timeout", type=float, default=300.0, help="Main run timeout in seconds.")
+ parser.add_argument(
+ "--cleanup-timeout", type=float, default=30.0, help="Cleanup timeout in seconds."
+ )
+ parser.add_argument("--run-id", help="Optional run identifier.")
+ parser.add_argument("--metadata-json", help="Inline JSON metadata to include in the result.")
+
+ args = parser.parse_args(argv)
+ metadata = _parse_metadata(args.metadata_json)
+ job = load_job(
+ script=args.script,
+ job_json=args.job_json,
+ result_json=args.result_json,
+ raw_log=args.raw_log,
+ command_log_jsonl=args.command_log_jsonl,
+ artifact_dir=args.artifact_dir,
+ operation=args.operation,
+ timeout_seconds=args.timeout,
+ cleanup_timeout_seconds=args.cleanup_timeout,
+ run_id=args.run_id,
+ metadata=metadata,
+ )
+ result = asyncio.run(execute_job(job))
+ print(json.dumps(result.to_dict(), indent=2, sort_keys=True))
+ return 0 if result.status == "success" else 1
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/robot_loop/runner_tests.py b/tools/robot_loop/runner_tests.py
new file mode 100644
index 00000000000..65cfd84da17
--- /dev/null
+++ b/tools/robot_loop/runner_tests.py
@@ -0,0 +1,122 @@
+import json
+import pathlib
+import tempfile
+import textwrap
+import unittest
+
+from tools.robot_loop.runner import execute_job, load_job
+
+
+class FakeBackend:
+ def __init__(self):
+ self._pip_firmware_version = "2009-05-01"
+ self._machine_conf = {"channels": 8}
+ self._extended_conf = {"iswap_installed": False}
+ self.cleanup_calls = []
+
+ async def send_command(self, module, command, **kwargs):
+ if command == "ERR":
+ raise RuntimeError("firmware rejected command")
+ return {"module": module, "command": command, "kwargs": kwargs}
+
+ async def move_all_channels_in_z_safety(self):
+ self.cleanup_calls.append("move_all_channels_in_z_safety")
+
+ async def stop(self):
+ self.cleanup_calls.append("stop")
+
+
+class RobotLoopRunnerTests(unittest.IsolatedAsyncioTestCase):
+ def setUp(self):
+ self._tempdir = tempfile.TemporaryDirectory()
+ self.addCleanup(self._tempdir.cleanup)
+ self.tmp_path = pathlib.Path(self._tempdir.name)
+
+ def _write_script(self, directory: pathlib.Path, name: str, body: str) -> pathlib.Path:
+ script_path = directory / name
+ script_path.write_text(textwrap.dedent(body), encoding="utf-8")
+ return script_path
+
+ async def test_execute_job_success_captures_firmware_context_and_commands(self):
+ script = self._write_script(
+ self.tmp_path,
+ "success_script.py",
+ """
+ from tools.robot_loop.runner_tests import FakeBackend
+
+ async def run(context):
+ backend = FakeBackend()
+ context.register_backend(backend, label="fake-star")
+ await backend.send_command("C0", "RF")
+ """,
+ )
+ job = load_job(script=str(script), artifact_dir=str(self.tmp_path), run_id="success")
+ result = await execute_job(job)
+
+ self.assertEqual(result.status, "success")
+ self.assertEqual(result.firmware_context[0]["pip_firmware_version"], "2009-05-01")
+
+ commands = job.command_log_jsonl.read_text(encoding="utf-8").strip().splitlines()
+ self.assertEqual(len(commands), 1)
+ self.assertEqual(json.loads(commands[0])["command"], "RF")
+
+ async def test_execute_job_failure_records_exception(self):
+ script = self._write_script(
+ self.tmp_path,
+ "failure_script.py",
+ """
+ async def run(context):
+ raise ValueError("bad run")
+ """,
+ )
+ job = load_job(script=str(script), artifact_dir=str(self.tmp_path), run_id="failure")
+ result = await execute_job(job)
+
+ self.assertEqual(result.status, "failure")
+ self.assertEqual(result.exception_type, "ValueError")
+ self.assertIn("bad run", result.exception_message)
+
+ async def test_execute_job_timeout_records_timeout_status(self):
+ script = self._write_script(
+ self.tmp_path,
+ "timeout_script.py",
+ """
+ import asyncio
+
+ async def run(context):
+ await asyncio.sleep(1)
+ """,
+ )
+ job = load_job(
+ script=str(script),
+ artifact_dir=str(self.tmp_path),
+ run_id="timeout",
+ timeout_seconds=0.01,
+ )
+ result = await execute_job(job)
+
+ self.assertEqual(result.status, "timeout")
+ self.assertEqual(result.exception_type, "TimeoutError")
+
+ async def test_execute_job_cleanup_runs_on_registered_backend(self):
+ script = self._write_script(
+ self.tmp_path,
+ "cleanup_script.py",
+ """
+ from tools.robot_loop.runner_tests import FakeBackend
+
+ backend = FakeBackend()
+
+ async def run(context):
+ context.register_backend(backend, label="fake-star")
+ await backend.send_command("C0", "ERR")
+ """,
+ )
+ job = load_job(script=str(script), artifact_dir=str(self.tmp_path), run_id="cleanup")
+ result = await execute_job(job)
+
+ self.assertEqual(result.status, "failure")
+ command_record = json.loads(job.command_log_jsonl.read_text(encoding="utf-8").strip())
+ self.assertEqual(command_record["error_type"], "RuntimeError")
+ raw_log = job.raw_log.read_text(encoding="utf-8")
+ self.assertIn("run_id=cleanup", raw_log)