diff --git a/docs/resources/library/vwr.md b/docs/resources/library/vwr.md index 62cae2f87ce..82edec8efbb 100644 --- a/docs/resources/library/vwr.md +++ b/docs/resources/library/vwr.md @@ -7,6 +7,7 @@ Company page: [Wikipedia](https://en.wikipedia.org/wiki/VWR_International) | Description | Image | PLR definition | |--------------------|--------------------|--------------------| | 'VWRReagentReservoirs25mL'
Part no.: 89094
[manufacturer website](https://us.vwr.com/store/product/4694822/vwr-disposable-pipetting-reservoirs)
Polystyrene Reservoirs | ![](img/vwr/VWRReagentReservoirs25mL.jpg) | `VWRReagentReservoirs25mL` | +| 'VWR_1_troughplate_195000uL_Ub'
Part no.: 77575-302
[manufacturer website](https://www.avantorsciences.com/us/en/product/47763965/vwr-multi-channel-polypropylene-reagent-reservoirs?isCatNumSearch=true&searchedCatalogNumber=77575-302)
Polypropylene multi-channel reagent reservoirs | ![](img/vwr/VWR_1_troughplate_195000uL_Ub.jpg) | `VWR_1_troughplate_195000uL_Ub` | ## Plates diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index fbecc6703a2..be9ebbc2fa7 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1441,7 +1441,9 @@ def core96_head_installed(self) -> bool: @property def num_arms(self) -> int: - return 1 if self.extended_conf.left_x_drive.iswap_installed else 0 + has_iswap = self.extended_conf.left_x_drive.iswap_installed + has_core_grippers = self._deck is not None and self._deck.has_resource("core_grippers") + return 1 if (has_iswap or has_core_grippers) else 0 @property def head96_installed(self) -> Optional[bool]: @@ -1672,6 +1674,65 @@ def _parse_firmware_version_datetime(self, fw_version: str) -> datetime.date: raise ValueError(f"Could not parse year from firmware version string: '{fw_version}'") return datetime.date(int(year_match.group(1)), 1, 1) + def _core96_has_old_firmware(self) -> bool: + """Return whether the installed CoRe96 firmware predates 2010 compatibility changes.""" + + head96_information = getattr(self, "_head96_information", None) + return head96_information is not None and head96_information.fw_version.year < 2010 + + def _pip_has_old_firmware(self) -> bool: + """Return whether the installed PIP firmware predates 2010 compatibility changes.""" + + pip_firmware_version = getattr(self, "_pip_firmware_version", None) + return pip_firmware_version is not None and pip_firmware_version.year < 2010 + + def _pip_supports_extended_tip_handling_command_params(self) -> bool: + """Return whether PIP tip-handling commands support newer optional parameters.""" + + return not self._pip_has_old_firmware() + + def _pip_supports_extended_liquid_command_params(self) -> bool: + """Return whether PIP liquid commands support newer optional parameters.""" + + return not self._pip_has_old_firmware() + + def _core96_supports_extended_liquid_command_params(self) -> bool: + """Return whether CoRe96 `EA`/`ED` support the newer optional parameter set.""" + + return not self._core96_has_old_firmware() + + def _normalize_extended_configuration_y_bounds(self) -> None: + """Normalize impossible PIP Y bounds reported by some STAR/STARlet firmware revisions. + + Legacy STARlet firmware has been observed to return the PIP Y bounds reversed in the + extended-configuration response and, on some revisions, to report an implausibly small maximum + Y position. Newer instruments should already report a sane interval, so normalize only when the + parsed values are clearly impossible. + """ + + ext_conf = self.extended_conf + + if ( + ext_conf.pip_maximal_y_position < ext_conf.left_arm_min_y_position + or ext_conf.pip_maximal_y_position < 100.0 + ): + default_bounds = ExtendedConfiguration() + logger.warning( + "Normalizing invalid PIP Y bounds reported by firmware: max_y=%s, min_y=%s, pip_fw=%s", + ext_conf.pip_maximal_y_position, + ext_conf.left_arm_min_y_position, + getattr(self, "_pip_firmware_version", None), + ) + ext_conf.pip_maximal_y_position = default_bounds.pip_maximal_y_position + ext_conf.left_arm_min_y_position = default_bounds.left_arm_min_y_position + + def _pip_y_bounds(self) -> Tuple[float, float]: + """Return normalized PIP Y bounds as (min_y, max_y).""" + + min_y = self.extended_conf.left_arm_min_y_position + max_y = self.extended_conf.pip_maximal_y_position + return (min(min_y, max_y), max(min_y, max_y)) + async def setup( self, skip_instrument_initialization=False, @@ -1696,6 +1757,7 @@ async def setup( self._machine_conf = await self.request_machine_configuration() self._extended_conf = await self.request_extended_configuration() self._head96_information: Optional[Head96Information] = None + self._pip_firmware_version: Optional[datetime.date] = None initialized = await self.request_instrument_initialization_status() @@ -1715,6 +1777,15 @@ async def setup( tip_presences = await self.request_tip_presence() self._num_channels = len(tip_presences) + pip_fw_response = await self.request_firmware_version() + pip_fw_match = re.search(r"rf(?P.+)$", pip_fw_response) + if pip_fw_match is None: + raise ValueError(f"Could not parse PIP firmware version from response: '{pip_fw_response}'") + self._pip_firmware_version = self._parse_firmware_version_datetime( + pip_fw_match.group("fw_version").strip() + ) + self._normalize_extended_configuration_y_bounds() + async def set_up_pip(): if (not initialized or any(tip_presences)) and not skip_pip: await self.initialize_pip() @@ -1841,12 +1912,13 @@ def can_reach_position(self, channel_idx: int, position: Coordinate) -> bool: # frontmost channel can go to y=6, every channel behind it constrains its min Y spacings = self._channels_minimum_y_spacing - min_y_pos = self.extended_conf.left_arm_min_y_position + sum(spacings[channel_idx + 1 :]) + lower_y_bound, upper_y_bound = self._pip_y_bounds() + min_y_pos = lower_y_bound + sum(spacings[channel_idx + 1 :]) if position.y < min_y_pos: return False # backmost channel max Y from config, every channel in front constrains its max Y - max_y_pos = self.extended_conf.pip_maximal_y_position - sum(spacings[:channel_idx]) + max_y_pos = upper_y_bound - sum(spacings[:channel_idx]) if position.y > max_y_pos: return False @@ -3457,7 +3529,7 @@ async def pick_up_tips96( self._check_96_position_legal(pickup_position, skip_z=True) - if tip_pickup_method == "from_rack": + if tip_pickup_method == "from_rack" and self._core96_supports_extended_liquid_command_params(): # the STAR will not automatically move the dispensing drive down if it is still up # so we need to move it down here # see https://github.com/PyLabRobot/pylabrobot/pull/835 @@ -4603,7 +4675,7 @@ async def move_channel_y(self, channel: int, y: float): f"(channel {channel - 1} y-position is {round(y, 2)} mm)" ) else: - max_y_pos = self.extended_conf.pip_maximal_y_position + _, max_y_pos = self._pip_y_bounds() if y > max_y_pos: raise ValueError(f"channel {channel} y-target must be <= {max_y_pos} mm") @@ -4616,10 +4688,9 @@ async def move_channel_y(self, channel: int, y: float): ) else: # STAR machines do not allow channels y < minimum - if y < self.extended_conf.left_arm_min_y_position: - raise ValueError( - f"channel {channel} y-target must be >= {self.extended_conf.left_arm_min_y_position} mm" - ) + min_y_pos, _ = self._pip_y_bounds() + if y < min_y_pos: + raise ValueError(f"channel {channel} y-target must be >= {min_y_pos} mm") await self.position_single_pipetting_channel_in_y_direction( pipetting_channel_index=channel + 1, y_position=round(y * 10) @@ -5837,7 +5908,7 @@ async def initialize_pipetting_channels( assert 0 <= tip_type <= 99, "tip must be between 0 and 99" assert 0 <= discarding_method <= 1, "discarding_method must be between 0 and 1" - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="DI", read_timeout=120, @@ -5848,8 +5919,10 @@ async def initialize_pipetting_channels( te=f"{z_position_at_end_of_a_command:04}", tm=[f"{tm:01}" for tm in tip_pattern], tt=f"{tip_type:02}", - ti=discarding_method, ) + if self._pip_supports_extended_tip_handling_command_params(): + command_kwargs["ti"] = discarding_method + return await self.send_command(**command_kwargs) # -------------- 3.5.2 Tip handling commands using PIP -------------- @@ -5894,7 +5967,7 @@ async def pick_up_tip( "minimum_traverse_height_at_beginning_of_a_command must be between 0 and 3600" ) - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="TP", tip_pattern=tip_pattern, @@ -5906,8 +5979,10 @@ async def pick_up_tip( tp=f"{begin_tip_pick_up_process:04}", tz=f"{end_tip_pick_up_process:04}", th=f"{minimum_traverse_height_at_beginning_of_a_command:04}", - td=pickup_method.value, ) + if self._pip_supports_extended_tip_handling_command_params(): + command_kwargs["td"] = pickup_method.value + return await self.send_command(**command_kwargs) @need_iswap_parked async def discard_tip( @@ -5959,7 +6034,7 @@ async def discard_tip( "z_position_at_end_of_a_command must be between 0 and 3600" ) - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="TR", tip_pattern=tip_pattern, @@ -5971,8 +6046,10 @@ async def discard_tip( tz=end_tip_deposit_process, th=minimum_traverse_height_at_beginning_of_a_command, te=z_position_at_end_of_a_command, - ti=discarding_method.value, ) + if self._pip_supports_extended_tip_handling_command_params(): + command_kwargs["ti"] = discarding_method.value + return await self.send_command(**command_kwargs) # TODO:(command:TW) Tip Pick-up for DC wash procedure @@ -6218,7 +6295,7 @@ async def aspirate_pip( ) assert all(0 <= x <= 3600 for x in cup_upper_edge), "cup_upper_edge must be between 0 and 3600" - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="AS", tip_pattern=tip_pattern, @@ -6232,7 +6309,10 @@ async def aspirate_pip( lp=[f"{lp:04}" for lp in lld_search_height], ch=[f"{ch:03}" for ch in clot_detection_height], zl=[f"{zl:04}" for zl in liquid_surface_no_lld], - po=[f"{po:04}" for po in pull_out_distance_transport_air], + ) + if self._pip_supports_extended_liquid_command_params(): + command_kwargs["po"] = [f"{po:04}" for po in pull_out_distance_transport_air] + command_kwargs.update( zu=[f"{zu:04}" for zu in second_section_height], zr=[f"{zr:05}" for zr in second_section_ratio], zx=[f"{zx:04}" for zx in minimum_height], @@ -6256,16 +6336,21 @@ async def aspirate_pip( mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface], ms=[f"{ms:04}" for ms in mix_speed], mh=[f"{mh:04}" for mh in mix_surface_following_distance], - gi=[f"{gi:03}" for gi in limit_curve_index], - gj=tadm_algorithm, - gk=recording_mode, - lk=[1 if lk else 0 for lk in use_2nd_section_aspiration], - ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip], - sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip], - se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search], - sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search], - io=[f"{io:04}" for io in cup_upper_edge], ) + if self._pip_supports_extended_liquid_command_params(): + command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index] + command_kwargs["gj"] = tadm_algorithm + command_kwargs["gk"] = recording_mode + if self._pip_supports_extended_liquid_command_params(): + command_kwargs.update( + lk=[1 if lk else 0 for lk in use_2nd_section_aspiration], + ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip], + sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip], + se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search], + sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search], + io=[f"{io:04}" for io in cup_upper_edge], + ) + return await self.send_command(**command_kwargs) @need_iswap_parked async def dispense_pip( @@ -6450,7 +6535,7 @@ async def dispense_pip( ) assert 0 <= recording_mode <= 2, "recording_mode must be between 0 and 2" - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="DS", tip_pattern=tip_pattern, @@ -6462,7 +6547,10 @@ async def dispense_pip( zx=[f"{zx:04}" for zx in minimum_height], lp=[f"{lp:04}" for lp in lld_search_height], zl=[f"{zl:04}" for zl in liquid_surface_no_lld], - po=[f"{po:04}" for po in pull_out_distance_transport_air], + ) + if self._pip_supports_extended_liquid_command_params(): + command_kwargs["po"] = [f"{po:04}" for po in pull_out_distance_transport_air] + command_kwargs.update( ip=[f"{ip:04}" for ip in immersion_depth], it=[f"{it:01}" for it in immersion_depth_direction], fp=[f"{fp:04}" for fp in surface_following_distance], @@ -6488,10 +6576,12 @@ async def dispense_pip( mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface], ms=[f"{ms:04}" for ms in mix_speed], mh=[f"{mh:04}" for mh in mix_surface_following_distance], - gi=[f"{gi:03}" for gi in limit_curve_index], - gj=tadm_algorithm, # - gk=recording_mode, # ) + if self._pip_supports_extended_liquid_command_params(): + command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index] + command_kwargs["gj"] = tadm_algorithm + command_kwargs["gk"] = recording_mode + return await self.send_command(**command_kwargs) # TODO:(command:DA) Simultaneous aspiration & dispensation of liquid @@ -8050,7 +8140,7 @@ async def aspirate_core_96( channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern]) channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:] - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="EA", read_timeout=max(300, self.read_timeout), @@ -8062,32 +8152,36 @@ async def aspirate_core_96( ze=f"{min_z_endpos:04}", lz=f"{lld_search_height:04}", zt=f"{liquid_surface_no_lld:04}", - pp=f"{pull_out_distance_transport_air:04}", - zm=f"{minimum_height:04}", - zv=f"{second_section_height:04}", - zq=f"{second_section_ratio:05}", - iw=f"{immersion_depth:03}", - ix=immersion_depth_direction, - fh=f"{surface_following_distance:03}", - af=f"{aspiration_volumes:05}", - ag=f"{aspiration_speed:04}", - vt=f"{transport_air_volume:03}", - bv=f"{blow_out_air_volume:05}", - wv=f"{pre_wetting_volume:05}", - cm=lld_mode, - cs=gamma_lld_sensitivity, - bs=f"{swap_speed:04}", - wh=f"{settling_time:02}", - hv=f"{mix_volume:05}", - hc=f"{mix_cycles:02}", - hp=f"{mix_position_from_liquid_surface:03}", - mj=f"{mix_surface_following_distance:03}", - hs=f"{speed_of_mix:04}", - cw=channel_pattern_hex, - cr=f"{limit_curve_index:03}", - cj=tadm_algorithm, - cx=recording_mode, ) + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}" + command_kwargs["zm"] = f"{minimum_height:04}" + command_kwargs["zv"] = f"{second_section_height:04}" + command_kwargs["zq"] = f"{second_section_ratio:05}" + command_kwargs["iw"] = f"{immersion_depth:03}" + command_kwargs["ix"] = immersion_depth_direction + command_kwargs["fh"] = f"{surface_following_distance:03}" + command_kwargs["af"] = f"{aspiration_volumes:05}" + command_kwargs["ag"] = f"{aspiration_speed:04}" + command_kwargs["vt"] = f"{transport_air_volume:03}" + command_kwargs["bv"] = f"{blow_out_air_volume:05}" + command_kwargs["wv"] = f"{pre_wetting_volume:05}" + command_kwargs["cm"] = lld_mode + command_kwargs["cs"] = gamma_lld_sensitivity + command_kwargs["bs"] = f"{swap_speed:04}" + command_kwargs["wh"] = f"{settling_time:02}" + command_kwargs["hv"] = f"{mix_volume:05}" + command_kwargs["hc"] = f"{mix_cycles:02}" + command_kwargs["hp"] = f"{mix_position_from_liquid_surface:03}" + command_kwargs["mj"] = f"{mix_surface_following_distance:03}" + command_kwargs["hs"] = f"{speed_of_mix:04}" + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["cw"] = channel_pattern_hex + command_kwargs["cr"] = f"{limit_curve_index:03}" + command_kwargs["cj"] = tadm_algorithm + command_kwargs["cx"] = recording_mode + + return await self.send_command(**command_kwargs) @need_iswap_parked @_requires_head96 @@ -8325,7 +8419,7 @@ async def dispense_core_96( channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern]) channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:] - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="ED", read_timeout=max(300, self.read_timeout), @@ -8338,33 +8432,37 @@ async def dispense_core_96( zq=f"{second_section_ratio:05}", lz=f"{lld_search_height:04}", zt=f"{liquid_surface_no_lld:04}", - pp=f"{pull_out_distance_transport_air:04}", - iw=f"{immersion_depth:03}", - ix=immersion_depth_direction, - fh=f"{surface_following_distance:03}", - zh=f"{minimum_traverse_height_at_beginning_of_a_command:04}", - ze=f"{min_z_endpos:04}", - df=f"{dispense_volume:05}", - dg=f"{dispense_speed:04}", - es=f"{cut_off_speed:04}", - ev=f"{stop_back_volume:03}", - vt=f"{transport_air_volume:03}", - bv=f"{blow_out_air_volume:05}", - cm=lld_mode, - cs=gamma_lld_sensitivity, - ej=f"{side_touch_off_distance:02}", - bs=f"{swap_speed:04}", - wh=f"{settling_time:02}", - hv=f"{mixing_volume:05}", - hc=f"{mixing_cycles:02}", - hp=f"{mix_position_from_liquid_surface:03}", - mj=f"{mix_surface_following_distance:03}", - hs=f"{speed_of_mixing:04}", - cw=channel_pattern_hex, - cr=f"{limit_curve_index:03}", - cj=tadm_algorithm, - cx=recording_mode, ) + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}" + command_kwargs["iw"] = f"{immersion_depth:03}" + command_kwargs["ix"] = immersion_depth_direction + command_kwargs["fh"] = f"{surface_following_distance:03}" + command_kwargs["zh"] = f"{minimum_traverse_height_at_beginning_of_a_command:04}" + command_kwargs["ze"] = f"{min_z_endpos:04}" + command_kwargs["df"] = f"{dispense_volume:05}" + command_kwargs["dg"] = f"{dispense_speed:04}" + command_kwargs["es"] = f"{cut_off_speed:04}" + command_kwargs["ev"] = f"{stop_back_volume:03}" + command_kwargs["vt"] = f"{transport_air_volume:03}" + command_kwargs["bv"] = f"{blow_out_air_volume:05}" + command_kwargs["cm"] = lld_mode + command_kwargs["cs"] = gamma_lld_sensitivity + command_kwargs["ej"] = f"{side_touch_off_distance:02}" + command_kwargs["bs"] = f"{swap_speed:04}" + command_kwargs["wh"] = f"{settling_time:02}" + command_kwargs["hv"] = f"{mixing_volume:05}" + command_kwargs["hc"] = f"{mixing_cycles:02}" + command_kwargs["hp"] = f"{mix_position_from_liquid_surface:03}" + command_kwargs["mj"] = f"{mix_surface_following_distance:03}" + command_kwargs["hs"] = f"{speed_of_mixing:04}" + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["cw"] = channel_pattern_hex + command_kwargs["cr"] = f"{limit_curve_index:03}" + command_kwargs["cj"] = tadm_algorithm + command_kwargs["cx"] = recording_mode + + return await self.send_command(**command_kwargs) # -------------- 3.10.4 Adjustment & movement commands -------------- @@ -10696,13 +10794,13 @@ async def clld_probe_y_position_using_channel( adj_upper_y = await self.request_y_pos_channel_n(channel_idx - 1) max_safe_upper_y_pos = adj_upper_y - self._min_spacing_between(channel_idx, channel_idx - 1) else: - max_safe_upper_y_pos = self.extended_conf.pip_maximal_y_position + _, max_safe_upper_y_pos = self._pip_y_bounds() if channel_idx < (self.num_channels - 1): adj_lower_y = await self.request_y_pos_channel_n(channel_idx + 1) max_safe_lower_y_pos = adj_lower_y + self._min_spacing_between(channel_idx, channel_idx + 1) else: - max_safe_lower_y_pos = self.extended_conf.left_arm_min_y_position + max_safe_lower_y_pos, _ = self._pip_y_bounds() # Enable safe start and end positions if start_pos_search: @@ -10783,7 +10881,7 @@ async def clld_probe_y_position_using_channel( channel_idx, channel_idx + 1 ) else: - min_y = self.extended_conf.left_arm_min_y_position + min_y, _ = self._pip_y_bounds() max_safe_dist = detected_material_y_pos - min_y move_target = detected_material_y_pos - min(post_detection_dist, max_safe_dist) @@ -10794,7 +10892,7 @@ async def clld_probe_y_position_using_channel( channel_idx, channel_idx - 1 ) else: - max_y = self.extended_conf.pip_maximal_y_position + _, max_y = self._pip_y_bounds() max_safe_dist = max_y - detected_material_y_pos move_target = detected_material_y_pos + min(post_detection_dist, max_safe_dist) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index b478ff7b637..c23f8867831 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -1,7 +1,9 @@ # mypy: disable-error-code="attr-defined,method-assign" +import datetime import unittest import unittest.mock +from dataclasses import replace from typing import Literal, cast from pylabrobot.liquid_handling import LiquidHandler @@ -34,6 +36,7 @@ CommandSyntaxError, HamiltonNoTipError, HardwareError, + Head96Information, STARBackend, STARFirmwareError, UnknownHamiltonError, @@ -168,6 +171,36 @@ async def test_send_command_plaintext_response(self): await self.star.send_command("C0", command="QM", fmt="id####") +class TestSTARConfigurationNormalization(unittest.TestCase): + def test_inverted_y_bounds_are_swapped(self): + star = STARBackend() + star._pip_firmware_version = datetime.date(2024, 1, 1) + star._extended_conf = replace( + _DEFAULT_EXTENDED_CONFIGURATION, + pip_maximal_y_position=6.0, + left_arm_min_y_position=22.0, + ) + + star._normalize_extended_configuration_y_bounds() + + self.assertEqual(star.extended_conf.pip_maximal_y_position, 606.5) + self.assertEqual(star.extended_conf.left_arm_min_y_position, 6.0) + + def test_valid_y_bounds_are_kept(self): + star = STARBackend() + star._pip_firmware_version = datetime.date(2009, 1, 1) + star._extended_conf = replace( + _DEFAULT_EXTENDED_CONFIGURATION, + pip_maximal_y_position=606.5, + left_arm_min_y_position=6.0, + ) + + star._normalize_extended_configuration_y_bounds() + + self.assertEqual(star.extended_conf.pip_maximal_y_position, 606.5) + self.assertEqual(star.extended_conf.left_arm_min_y_position, 6.0) + + class STARCommandCatcher(STARBackend): """Mock backend for star that catches commands and saves them instead of sending them to the machine.""" @@ -270,6 +303,18 @@ def __init__(self, name: str): set_tip_tracking(enabled=False) + def _set_core96_firmware_year(self, year: int) -> None: + self.STAR._head96_information = Head96Information( + fw_version=datetime.date(year, 1, 1), + supports_clot_monitoring_clld=False, + stop_disc_type="core_i", + instrument_type="legacy", + head_type="96 head II", + ) + + def _set_pip_firmware_year(self, year: int) -> None: + self.STAR._pip_firmware_version = datetime.date(year, 1, 1) + async def test_core_read_barcode_success(self): """core_read_barcode_of_picked_up_resource should send ZB and return a Barcode.""" @@ -688,6 +733,24 @@ async def test_core_96_tip_pickup(self): ] ) + async def test_core_96_tip_pickup_skips_dispensing_drive_move_on_old_firmware(self): + self._set_core96_firmware_year(2009) + + await self.lh.pick_up_tips96(self.tip_rack) + + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call("C0TTid0001tt01tf1tl0519tv03600tg2tu0"), + _any_write_and_read_command_call("C0EPid0002xs01179xd0yh2418tt01wu0za2164zh2450ze2450"), + ] + ) + self.assertFalse( + any( + "H0DQ" in call.kwargs["cmd"] + for call in self.STAR._write_and_read_command.call_args_list # type: ignore + ) + ) + async def test_tip_tracking_pick_up96(self): set_tip_tracking(enabled=True) await self.lh.pick_up_tips96(self.tip_rack) @@ -731,6 +794,24 @@ async def test_core_96_aspirate(self): ] ) + async def test_core_96_aspirate_old_firmware_omits_extended_params(self): + self._set_core96_firmware_year(2009) + + await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips + self.STAR._write_and_read_command.reset_mock() + + assert self.plate.lid is not None + self.plate.lid.unassign() + await self.lh.aspirate96(self.plate, volume=100, blow_out=True) + + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "C0EAid0003aa0xs02983xd0yh1457zh2450ze2450lz1999zt1866zm1866zv0032zq06180iw000ix0fh000af01083ag2500vt050bv00000wv00050cm0cs1bs0020wh10hv00000hc00hp000mj000hs1200" + ), + ] + ) + async def test_core_96_dispense(self): await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips if self.plate.lid is not None: @@ -750,6 +831,26 @@ async def test_core_96_dispense(self): ] ) + async def test_core_96_dispense_old_firmware_omits_extended_params(self): + self._set_core96_firmware_year(2009) + + await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips + if self.plate.lid is not None: + self.plate.lid.unassign() + await self.lh.aspirate96(self.plate, 100, blow_out=True) + self.STAR._write_and_read_command.reset_mock() + + with no_volume_tracking(): + await self.lh.dispense96(self.plate, 100, blow_out=True) + + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call( + "C0EDid0004da3xs02983xd0yh1457zm1866zv0032zq06180lz1999zt1866iw000ix0fh000zh2450ze2450df01083dg1200es0050ev000vt050bv00000cm0cs1ej00bs0020wh00hv00000hc00hp000mj000hs1200" + ), + ] + ) + async def test_core_96_dispense_quadrant(self): """Test that each quadrant of a 384-well plate produces the correct firmware command. @@ -1015,6 +1116,160 @@ async def test_discard_tips(self): ] ) + async def test_pick_up_tips_omits_pickup_method_for_old_pip_firmware(self): + self._set_pip_firmware_year(2009) + + await self.lh.pick_up_tips(self.tip_rack["A1:H1"]) + + sent = self.STAR._write_and_read_command.call_args_list[-1].kwargs["cmd"] + self.assertTrue(sent.startswith("C0TP")) + self.assertNotIn("td", sent) + + async def test_discard_tips_omits_discarding_method_for_old_pip_firmware(self): + self._set_pip_firmware_year(2009) + await self.lh.pick_up_tips(self.tip_rack["A1:H1"]) + self.STAR._write_and_read_command.side_effect = [ + "C0TRid0003kz000 000 000 000 000 000 000 000vz000 000 000 000 000 000 000 000" + ] + self.STAR._write_and_read_command.reset_mock() + + await self.lh.discard_tips() + + sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"] + self.assertTrue(sent.startswith("C0TR")) + self.assertNotIn("ti", sent) + + async def test_initialize_pipetting_channels_omits_discarding_method_for_old_pip_firmware(self): + self._set_pip_firmware_year(2009) + + await self.STAR.initialize_pipetting_channels( + x_positions=[8000], + y_positions=[3427, 3337, 3247, 3157, 3067, 2977, 2887, 2797], + begin_of_tip_deposit_process=2450, + end_of_tip_deposit_process=1870, + z_position_at_end_of_a_command=2450, + tip_pattern=[True] * 8, + tip_type=1, + discarding_method=0, + ) + + sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"] + self.assertTrue(sent.startswith("C0DI")) + self.assertNotIn("ti", sent) + + async def test_aspirate_pip_omits_tadm_fields_for_old_pip_firmware(self): + self._set_pip_firmware_year(2009) + + await self.STAR.aspirate_pip( + aspiration_type=[0], + tip_pattern=[True], + x_positions=[8000], + y_positions=[3427], + aspiration_volumes=[100], + limit_curve_index=[0], + tadm_algorithm=False, + recording_mode=0, + ) + + sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"] + self.assertTrue(sent.startswith("C0AS")) + self.assertNotIn("po", sent) + self.assertNotIn("gi", sent) + self.assertNotIn("gj", sent) + self.assertNotIn("gk", sent) + self.assertNotIn("lk", sent) + self.assertNotIn("ik", sent) + self.assertNotIn("sd", sent) + self.assertNotIn("se", sent) + self.assertNotIn("sz", sent) + self.assertNotIn("io", sent) + + async def test_dispense_pip_omits_tadm_fields_for_old_pip_firmware(self): + self._set_pip_firmware_year(2009) + + await self.STAR.dispense_pip( + tip_pattern=[True], + dispensing_mode=[0], + x_positions=[8000], + y_positions=[3427], + dispense_volumes=[100], + limit_curve_index=[0], + tadm_algorithm=False, + recording_mode=0, + ) + + sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"] + self.assertTrue(sent.startswith("C0DS")) + self.assertNotIn("po", sent) + self.assertNotIn("gi", sent) + self.assertNotIn("gj", sent) + self.assertNotIn("gk", sent) + + async def test_aspirate_pip_includes_po_for_modern_pip_firmware(self): + self._set_pip_firmware_year(2024) + + await self.STAR.aspirate_pip( + aspiration_type=[0], + tip_pattern=[True], + x_positions=[8000], + y_positions=[3427], + aspiration_volumes=[100], + pull_out_distance_transport_air=[100], + ) + + sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"] + self.assertTrue(sent.startswith("C0AS")) + self.assertIn("po0100", sent) + + async def test_dispense_pip_includes_po_for_modern_pip_firmware(self): + self._set_pip_firmware_year(2024) + + await self.STAR.dispense_pip( + tip_pattern=[True], + dispensing_mode=[0], + x_positions=[8000], + y_positions=[3427], + dispense_volumes=[100], + pull_out_distance_transport_air=[100], + ) + + sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"] + self.assertTrue(sent.startswith("C0DS")) + self.assertIn("po0100", sent) + + async def test_ops_to_fw_positions_allows_legacy_sub_9mm_spacing(self): + self._set_pip_firmware_year(2009) + + op1 = Pickup( + resource=self.tip_rack["A1"][0], + offset=Coordinate.zero(), + tip=self.tip_rack["A1"][0].get_tip(), + ) + op2 = Pickup( + resource=self.tip_rack["A1"][0], + offset=Coordinate(0, -7.9, 0), + tip=self.tip_rack["A1"][0].get_tip(), + ) + + self.STAR._ops_to_fw_positions((op1, op2), use_channels=[0, 1]) + + async def test_ops_to_fw_positions_rejects_modern_sub_9mm_spacing(self): + self._set_pip_firmware_year(2024) + + op1 = Pickup( + resource=self.tip_rack["A1"][0], + offset=Coordinate.zero(), + tip=self.tip_rack["A1"][0].get_tip(), + ) + op2 = Pickup( + resource=self.tip_rack["A1"][0], + offset=Coordinate(0, -7.9, 0), + tip=self.tip_rack["A1"][0].get_tip(), + ) + + with self.assertRaisesRegex(ValueError, "Minimum distance between two y positions is <9mm"): + self.STAR._ops_to_fw_positions((op1, op2), use_channels=[0, 1]) + async def test_portrait_tip_rack_handling(self): deck = STARLetDeck() lh = LiquidHandler(self.STAR, deck=deck) @@ -1574,6 +1829,23 @@ async def test_can_reach_4ch_18mm_rejects_back_channel_too_far_back(self): backend._channels_minimum_y_spacing = [18.0] * 4 self.assertFalse(backend.can_reach_position(3, Coordinate(100, 574, 100))) + async def test_can_reach_position_normalizes_inverted_y_bounds(self): + """Inverted raw Y limits should not make ordinary 8-channel STARlet positions unreachable.""" + + backend = STARBackend() + backend._num_channels = 8 + backend._channels_minimum_y_spacing = [8.98] * 8 + backend._extended_conf = replace( + _DEFAULT_EXTENDED_CONFIGURATION, + pip_maximal_y_position=6.0, + left_arm_min_y_position=22.0, + ) + + backend._normalize_extended_configuration_y_bounds() + + self.assertTrue(backend.can_reach_position(0, Coordinate(100, 337.8, 100))) + self.assertTrue(backend.can_reach_position(7, Coordinate(100, 274.8, 100))) + # -- position_channels_in_y_direction: validation rejects tight positions ------- def _make_star_backend(self, num_channels, spacings): diff --git a/pylabrobot/liquid_handling/backends/hamilton/base.py b/pylabrobot/liquid_handling/backends/hamilton/base.py index 73ff83be6f7..dfa7657d898 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/base.py +++ b/pylabrobot/liquid_handling/backends/hamilton/base.py @@ -416,21 +416,25 @@ def _ops_to_fw_positions( y_pos = ops[i].resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + ops[i].offset.y y_positions.append(round(y_pos * 10)) + pip_has_old_firmware = getattr(self, "_pip_has_old_firmware", None) + enforce_min_y_spacing = not (callable(pip_has_old_firmware) and pip_has_old_firmware()) + # check that the minimum d between any two y positions is >9mm # O(n^2) search is not great but this is most readable, and the max size is 16, so it's fine. - for channel_idx1, (x1, y1) in enumerate(zip(x_positions, y_positions)): - for channel_idx2, (x2, y2) in enumerate(zip(x_positions, y_positions)): - if channel_idx1 == channel_idx2: - continue - if not channels_involved[channel_idx1] or not channels_involved[channel_idx2]: - continue - if x1 != x2: # channels not on the same column -> will be two operations on the machine - continue - if y1 != y2 and abs(y1 - y2) < 90: - raise ValueError( - f"Minimum distance between two y positions is <9mm: {y1}, {y2}" - f" (channel {channel_idx1} and {channel_idx2})" - ) + if enforce_min_y_spacing: + for channel_idx1, (x1, y1) in enumerate(zip(x_positions, y_positions)): + for channel_idx2, (x2, y2) in enumerate(zip(x_positions, y_positions)): + if channel_idx1 == channel_idx2: + continue + if not channels_involved[channel_idx1] or not channels_involved[channel_idx2]: + continue + if x1 != x2: # channels not on the same column -> will be two operations on the machine + continue + if y1 != y2 and abs(y1 - y2) < 90: + raise ValueError( + f"Minimum distance between two y positions is <9mm: {y1}, {y2}" + f" (channel {channel_idx1} and {channel_idx2})" + ) if len(ops) > self.num_channels: raise ValueError(f"Too many channels specified: {len(ops)} > {self.num_channels}") diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index ef194c5a3a8..fbe74f0873b 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -354,6 +354,11 @@ def _compute_spread_offsets( spread: str, ) -> List[Coordinate]: """Compute channel spread offsets for a single-resource multi-channel operation.""" + pip_has_old_firmware = getattr(self.backend, "_pip_has_old_firmware", None) + if spread != "custom" and callable(pip_has_old_firmware) and pip_has_old_firmware(): + centers = list(reversed(resource.centers(yn=len(use_channels), zn=0))) + return [c - resource.center() for c in centers] + return compute_channel_offsets( resource=resource, num_channels=len(use_channels), @@ -836,7 +841,7 @@ def _check_containers(self, resources: Sequence[Resource]): @need_setup_finished async def aspirate( self, - resources: Sequence[Container], + resources: Union[Container, Sequence[Container]], vols: List[float], use_channels: Optional[List[int]] = None, flow_rates: Optional[List[Optional[float]]] = None, @@ -901,6 +906,9 @@ async def aspirate( ValueError: If all channels are `None`. """ + if isinstance(resources, Container): + resources = [resources] + self._log_command( "aspirate", resources=resources, @@ -946,6 +954,8 @@ async def aspirate( ("liquid_height", liquid_height), ("blow_out_air_volume", blow_out_air_volume), ]: + if n == "resources" and len(p) == 1: + continue if len(p) != len(use_channels): raise ValueError( f"Length of {n} must match length of use_channels: {len(p)} != {len(use_channels)}" @@ -955,13 +965,18 @@ async def aspirate( # want to space the channels evenly across the resource. Note that offsets are relative to the # center of the resource. if len(set(resources)) == 1: + original_num_resources = len(resources) resource = resources[0] resources = [resource] * len(use_channels) + pip_has_old_firmware = getattr(self.backend, "_pip_has_old_firmware", None) + legacy_repeated_resource_list = ( + callable(pip_has_old_firmware) and pip_has_old_firmware() and original_num_resources > 1 + ) + if not legacy_repeated_resource_list: + center_offsets = self._compute_spread_offsets(resource, use_channels, spread) - center_offsets = self._compute_spread_offsets(resource, use_channels, spread) - - # add user defined offsets to the computed centers - offsets = [c + o for c, o in zip(center_offsets, offsets)] + # add user defined offsets to the computed centers + offsets = [c + o for c, o in zip(center_offsets, offsets)] # create operations aspirations = [ @@ -1029,7 +1044,7 @@ async def aspirate( @need_setup_finished async def dispense( self, - resources: Sequence[Container], + resources: Union[Container, Sequence[Container]], vols: List[float], use_channels: Optional[List[int]] = None, flow_rates: Optional[List[Optional[float]]] = None, @@ -1092,6 +1107,9 @@ async def dispense( ValueError: If all channels are `None`. """ + if isinstance(resources, Container): + resources = [resources] + self._log_command( "dispense", resources=resources, @@ -1128,13 +1146,18 @@ async def dispense( # want to space the channels evenly across the resource. Note that offsets are relative to the # center of the resource. if len(set(resources)) == 1: + original_num_resources = len(resources) resource = resources[0] resources = [resource] * len(use_channels) + pip_has_old_firmware = getattr(self.backend, "_pip_has_old_firmware", None) + legacy_repeated_resource_list = ( + callable(pip_has_old_firmware) and pip_has_old_firmware() and original_num_resources > 1 + ) + if not legacy_repeated_resource_list: + center_offsets = self._compute_spread_offsets(resource, use_channels, spread) - center_offsets = self._compute_spread_offsets(resource, use_channels, spread) - - # add user defined offsets to the computed centers - offsets = [c + o for c, o in zip(center_offsets, offsets)] + # add user defined offsets to the computed centers + offsets = [c + o for c, o in zip(center_offsets, offsets)] tips = [self.head[channel].get_tip() for channel in use_channels] @@ -1159,6 +1182,8 @@ async def dispense( ("liquid_height", liquid_height), ("blow_out_air_volume", blow_out_air_volume), ]: + if n == "resources" and len(p) == 1: + continue if len(p) != len(use_channels): raise ValueError( f"Length of {n} must match length of use_channels: {len(p)} != {len(use_channels)}" diff --git a/pylabrobot/liquid_handling/liquid_handler_tests.py b/pylabrobot/liquid_handling/liquid_handler_tests.py index d27eba719e2..e86ef030d27 100644 --- a/pylabrobot/liquid_handling/liquid_handler_tests.py +++ b/pylabrobot/liquid_handling/liquid_handler_tests.py @@ -29,6 +29,7 @@ ResourceNotFoundError, ResourceStack, TipRack, + VWR_1_trough_195000uL_Ub, nest_1_troughplate_195000uL_Vb, no_tip_tracking, set_tip_tracking, @@ -48,7 +49,7 @@ from pylabrobot.resources.volume_tracker import ( set_volume_tracking, ) -from pylabrobot.resources.well import Well +from pylabrobot.resources.well import CrossSectionType, Well, WellBottomType from pylabrobot.serializer import serialize from .liquid_handler import LiquidHandler @@ -1328,3 +1329,68 @@ async def test_no_go_zones_tight_vs_wide(self): wide_gap_lower = abs(wide_offsets[1] - wide_offsets[0]) tight_gap_lower = abs(tight_offsets[1] - tight_offsets[0]) self.assertGreater(wide_gap_lower, tight_gap_lower) + + async def test_old_pip_firmware_preserves_repeated_resource_zero_offsets(self): + """Legacy PIP firmware should keep the old repeated-resource zero-offset behavior.""" + self.backend._pip_has_old_firmware = lambda: True + + small_well = Well( + name="small_well", + size_x=107.2, + size_y=70.9, + size_z=24.76, + material_z_thickness=1.0, + bottom_type=WellBottomType.FLAT, + cross_section_type=CrossSectionType.RECTANGLE, + ) + self.deck.assign_child_resource(small_well, location=Coordinate(200, 100, 0)) + small_well.tracker.set_volume(50_000) + + tips = [self.tip_rack.get_item(f"{chr(65 + i)}1").get_tip() for i in range(8)] + self.lh.update_head_state({i: t for i, t in enumerate(tips)}) + + await self.lh.aspirate([small_well] * 8, vols=[100] * 8, use_channels=list(range(8))) + + ops = self.backend.aspirate.call_args.kwargs["ops"] + offsets = [op.offset.y for op in ops] + self.assertEqual(offsets, [0.0] * 8) + + async def test_old_pip_firmware_spreads_single_trough_resource(self): + """Legacy PIP firmware should still spread channels across a single trough container.""" + self.backend._pip_has_old_firmware = lambda: True + + trough = VWR_1_trough_195000uL_Ub(name="vwr_trough") + self.deck.assign_child_resource(trough, location=Coordinate(200, 100, 0)) + trough.tracker.set_volume(50_000) + + tips = [self.tip_rack.get_item(f"{chr(65 + i)}1").get_tip() for i in range(8)] + self.lh.update_head_state({i: t for i, t in enumerate(tips)}) + + await self.lh.aspirate(trough, vols=[100] * 8, use_channels=list(range(8))) + + ops = self.backend.aspirate.call_args.kwargs["ops"] + offsets = [op.offset.y for op in ops] + self.assertNotEqual(offsets, [0.0] * 8) + self.assertEqual(offsets, sorted(offsets, reverse=True)) + + async def test_modern_firmware_still_rejects_too_small_single_resource(self): + """Modern spacing logic should still reject resources that cannot fit the channels.""" + self.backend._pip_has_old_firmware = lambda: False + + small_well = Well( + name="small_well", + size_x=107.2, + size_y=70.9, + size_z=24.76, + material_z_thickness=1.0, + bottom_type=WellBottomType.FLAT, + cross_section_type=CrossSectionType.RECTANGLE, + ) + self.deck.assign_child_resource(small_well, location=Coordinate(200, 100, 0)) + small_well.tracker.set_volume(50_000) + + tips = [self.tip_rack.get_item(f"{chr(65 + i)}1").get_tip() for i in range(8)] + self.lh.update_head_state({i: t for i, t in enumerate(tips)}) + + with self.assertRaisesRegex(ValueError, "Resource is too small to space channels."): + await self.lh.aspirate([small_well] * 8, vols=[100] * 8, use_channels=list(range(8))) diff --git a/pylabrobot/resources/vwr/__init__.py b/pylabrobot/resources/vwr/__init__.py index fa3552c9330..d1f72871cd4 100644 --- a/pylabrobot/resources/vwr/__init__.py +++ b/pylabrobot/resources/vwr/__init__.py @@ -1,2 +1,2 @@ from .plates import VWR_1_troughplate_195000uL_Ub, VWR_96_wellplate_2mL_Vb -from .troughs import VWRReagentReservoirs25mL +from .troughs import VWR_1_trough_195000uL_Ub, VWRReagentReservoirs25mL diff --git a/pylabrobot/resources/vwr/troughs.py b/pylabrobot/resources/vwr/troughs.py index 252c9cc4348..379685f6067 100644 --- a/pylabrobot/resources/vwr/troughs.py +++ b/pylabrobot/resources/vwr/troughs.py @@ -1,4 +1,36 @@ -from pylabrobot.resources.trough import Trough +from pylabrobot.resources.height_volume_functions import ( + compute_height_from_volume_rectangle, + compute_volume_from_height_rectangle, +) +from pylabrobot.resources.trough import Trough, TroughBottomType + + +def VWR_1_trough_195000uL_Ub(name: str) -> Trough: + """VWR NA Cat. No. 77575-302""" + + inner_width = 127.76 - (14.38 - 8.9 / 2) * 2 + inner_length = 85.48 - (11.24 - 8.9 / 2) * 2 + + return Trough( + name=name, + size_x=127.76, # from spec + size_y=85.48, # from spec + size_z=31.4, # from spec + material_z_thickness=3.55, # from spec + max_volume=195000, # from spec 195 mL + model=VWR_1_trough_195000uL_Ub.__name__, + bottom_type=TroughBottomType.U, + compute_height_from_volume=lambda liquid_volume: compute_height_from_volume_rectangle( + liquid_volume, + inner_length, + inner_width, + ), + compute_volume_from_height=lambda liquid_height: compute_volume_from_height_rectangle( + liquid_height, + inner_length, + inner_width, + ), + ) def VWRReagentReservoirs25mL(name: str) -> Trough: diff --git a/tools/README.md b/tools/README.md index f9121ca503a..ba2db9e6be9 100644 --- a/tools/README.md +++ b/tools/README.md @@ -2,3 +2,5 @@ - `make_fw`: script for converting commands from the firmware documents into Python methods. - `make_resources`: scripts to create PyLabRobot methods for various resources. +- `robot_loop`: local Codex hardware runner that executes generated scripts, captures structured + JSON results, and preserves raw logs plus firmware command traces. diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 00000000000..5f0ec862612 --- /dev/null +++ b/tools/__init__.py @@ -0,0 +1 @@ +"""Local tooling modules for PyLabRobot development and hardware workflows.""" diff --git a/tools/robot_loop/__init__.py b/tools/robot_loop/__init__.py new file mode 100644 index 00000000000..294f0583e72 --- /dev/null +++ b/tools/robot_loop/__init__.py @@ -0,0 +1 @@ +"""Helpers for Codex-driven local hardware iteration loops.""" diff --git a/tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py b/tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py new file mode 100644 index 00000000000..18ba246b7ee --- /dev/null +++ b/tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py @@ -0,0 +1,93 @@ +"""Notebook-matched STARlet CoRe 8 tip pickup example for the robot loop runner. + +This mirrors the setup in +/home/harleyk/Documents/GrindBio_Doris_Scripts/head_96w_pick_up_tips.ipynb +for a single hardware action: + +- initialize the STARlet backend +- place the same carriers and labware on the deck +- pick up 1000 uL filtered tips on CoRe 8 channels 0-7 from A1:H1 + +Run with: + +python -m tools.robot_loop.runner \ + --script tools/robot_loop/examples/starlet_core8_pick_up_1000ul_tips.py \ + --artifact-dir /tmp/starlet-runner-artifacts \ + --operation tips \ + --run-id core8_pickup_1000ul_a1_h1 +""" + +from pylabrobot.liquid_handling import LiquidHandler +from pylabrobot.liquid_handling.backends import STARBackend +from pylabrobot.resources.agenbio import AGenBio_1_troughplate_190000uL_Fl +from pylabrobot.resources.bioer import BioER_96_wellplate_Vb_2200uL +from pylabrobot.resources.hamilton import ( + MFX_CAR_L5_base, + STARLetDeck, + TIP_CAR_480_A00, + hamilton_96_tiprack_1000uL_filter, + hamilton_96_tiprack_300uL_filter, + hamilton_96_tiprack_50uL_filter, +) +from pylabrobot.resources.hamilton.mfx_modules import Hamilton_MFX_plateholder_DWP_metal_tapped + + +async def run(context): + backend = STARBackend() + context.register_backend(backend, label="starlet") + + lh = LiquidHandler(backend=backend, deck=STARLetDeck()) + + plateholder_0 = Hamilton_MFX_plateholder_DWP_metal_tapped(name="plateholder_0") + plateholder_1 = Hamilton_MFX_plateholder_DWP_metal_tapped(name="plateholder_1") + plateholder_2 = Hamilton_MFX_plateholder_DWP_metal_tapped(name="plateholder_2") + plate_carrier = MFX_CAR_L5_base( + name="plate_modules", + modules={ + 0: plateholder_0, + 1: plateholder_1, + 2: plateholder_2, + }, + ) + + tip_carrier = TIP_CAR_480_A00(name="tip_carrier_480") + + water_plate = AGenBio_1_troughplate_190000uL_Fl(name="water_plate") + dw_plate = BioER_96_wellplate_Vb_2200uL(name="dw_plate") + hi_orange = AGenBio_1_troughplate_190000uL_Fl(name="hiOrange") + + tip_rack_1000 = hamilton_96_tiprack_1000uL_filter(name="tiprack_1000ul_filter") + tip_rack_50 = hamilton_96_tiprack_50uL_filter(name="tiprack_50ul_filter") + tip_rack_300 = hamilton_96_tiprack_300uL_filter(name="tiprack_300ul_filter") + + plate_carrier[0] = dw_plate + plate_carrier[1] = water_plate + plate_carrier[2] = hi_orange + + tip_carrier[2] = tip_rack_1000 + tip_carrier[1] = tip_rack_50 + tip_carrier[0] = tip_rack_300 + + lh.deck.assign_child_resource(tip_carrier, rails=13) + lh.deck.assign_child_resource(plate_carrier, rails=7) + + try: + await lh.setup(skip_autoload=True) + context.add_note("setup complete") + context.add_note("attempting CoRe 8 pickup from 1000 uL rack slot 2 on TIP_CAR_480_A00") + + await lh.pick_up_tips( + tip_rack_1000["A1:H1"], + use_channels=list(range(8)), + ) + + context.add_note("pick_up_tips completed") + except Exception: + try: + faulty = await backend.request_name_of_last_faulty_parameter() + context.write_json_artifact("last_faulty_parameter.json", faulty) + except Exception as inner_exc: + context.add_note(f"could not query last faulty parameter: {inner_exc}") + raise + finally: + await lh.stop() diff --git a/tools/robot_loop/examples/starlet_firmware_probe.py b/tools/robot_loop/examples/starlet_firmware_probe.py new file mode 100644 index 00000000000..a7a3fedfe8b --- /dev/null +++ b/tools/robot_loop/examples/starlet_firmware_probe.py @@ -0,0 +1,20 @@ +"""Minimal STARlet probe for the local robot loop runner. + +This is intended as a safe first hardware script: setup, capture firmware context, and stop. +""" + +from pylabrobot.liquid_handling import LiquidHandler +from pylabrobot.liquid_handling.backends.hamilton.STAR_backend import STARBackend +from pylabrobot.resources import STARLetDeck + + +async def run(context): + backend = STARBackend() + context.register_backend(backend, label="starlet") + + lh = LiquidHandler(backend=backend, deck=STARLetDeck()) + try: + await lh.setup() + context.add_note("setup completed") + finally: + await lh.stop() diff --git a/tools/robot_loop/runner.py b/tools/robot_loop/runner.py new file mode 100644 index 00000000000..dd7ea8a4a57 --- /dev/null +++ b/tools/robot_loop/runner.py @@ -0,0 +1,453 @@ +import argparse +import asyncio +import contextlib +import dataclasses +import datetime +import importlib.util +import inspect +import io +import json +import pathlib +import sys +import traceback +import types +import typing +import uuid + + +def _utcnow() -> datetime.datetime: + return datetime.datetime.now(datetime.timezone.utc) + + +def _to_jsonable(value: typing.Any) -> typing.Any: + if dataclasses.is_dataclass(value): + return _to_jsonable(dataclasses.asdict(value)) + if isinstance(value, pathlib.Path): + return str(value) + if isinstance(value, (datetime.datetime, datetime.date, datetime.time)): + return value.isoformat() + if isinstance(value, dict): + return {str(k): _to_jsonable(v) for k, v in value.items()} + if isinstance(value, (list, tuple, set)): + return [_to_jsonable(v) for v in value] + if isinstance(value, (str, int, float, bool)) or value is None: + return value + return repr(value) + + +@dataclasses.dataclass +class FirmwareCommandRecord: + timestamp: str + backend_label: str + module: str + command: str + kwargs: dict + response: typing.Any = None + error_type: typing.Optional[str] = None + error_message: typing.Optional[str] = None + + def to_dict(self) -> dict: + return _to_jsonable(dataclasses.asdict(self)) + + +@dataclasses.dataclass +class RobotRunResult: + run_id: str + status: str + script_path: str + operation: str + started_at: str + finished_at: str + timeout_seconds: float + exception_type: typing.Optional[str] = None + exception_message: typing.Optional[str] = None + traceback: typing.Optional[str] = None + notes: typing.Optional[list] = None + metadata: typing.Optional[dict] = None + firmware_context: typing.Optional[list] = None + artifacts: typing.Optional[dict] = None + cleanup_error: typing.Optional[str] = None + + def to_dict(self) -> dict: + return _to_jsonable(dataclasses.asdict(self)) + + +@dataclasses.dataclass +class RobotRunJob: + script: pathlib.Path + result_json: pathlib.Path + raw_log: pathlib.Path + command_log_jsonl: pathlib.Path + artifact_dir: pathlib.Path + operation: str = "unspecified" + timeout_seconds: float = 300.0 + cleanup_timeout_seconds: float = 30.0 + run_id: str = dataclasses.field(default_factory=lambda: uuid.uuid4().hex) + metadata: dict = dataclasses.field(default_factory=dict) + + +class _TeeTextIO(io.TextIOBase): + def __init__(self, *targets: typing.TextIO): + self.targets = targets + + def write(self, data: str) -> int: + for target in self.targets: + target.write(data) + target.flush() + return len(data) + + def flush(self) -> None: + for target in self.targets: + target.flush() + + +class RobotJobContext: + def __init__( + self, + *, + run_id: str, + script_path: pathlib.Path, + artifact_dir: pathlib.Path, + command_log_jsonl: pathlib.Path, + operation: str, + metadata: typing.Optional[dict] = None, + ): + self.run_id = run_id + self.script_path = script_path + self.artifact_dir = artifact_dir + self.command_log_jsonl = command_log_jsonl + self.operation = operation + self.metadata = metadata or {} + self.notes: list[str] = [] + self._registered_backends: list[dict] = [] + + def add_note(self, message: str) -> None: + self.notes.append(message) + + def write_json_artifact(self, relative_path: str, data: typing.Any) -> pathlib.Path: + target = self.artifact_dir / relative_path + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(json.dumps(_to_jsonable(data), indent=2, sort_keys=True), encoding="utf-8") + return target + + def register_backend( + self, + backend: typing.Any, + *, + label: typing.Optional[str] = None, + capture_commands: bool = True, + ) -> typing.Any: + backend_label = label or f"{type(backend).__name__}_{len(self._registered_backends)}" + registration = { + "label": backend_label, + "backend": backend, + "original_send_command": getattr(backend, "send_command", None), + "capture_commands": capture_commands, + } + self._registered_backends.append(registration) + + if capture_commands and registration["original_send_command"] is not None: + original_send_command = registration["original_send_command"] + + async def wrapped_send_command(instance, *args, **kwargs): + module = kwargs.get("module") + command = kwargs.get("command") + if len(args) >= 1: + module = args[0] + if len(args) >= 2: + command = args[1] + + command_kwargs = { + key: value + for key, value in kwargs.items() + if key not in {"module", "command"} + } + + record = FirmwareCommandRecord( + timestamp=_utcnow().isoformat(), + backend_label=backend_label, + module=str(module), + command=str(command), + kwargs=_to_jsonable(command_kwargs), + ) + try: + response = await original_send_command(*args, **kwargs) + record.response = _to_jsonable(response) + return response + except Exception as exc: + record.error_type = type(exc).__name__ + record.error_message = str(exc) + raise + finally: + self._append_command_record(record) + + backend.send_command = types.MethodType(wrapped_send_command, backend) + + return backend + + def _append_command_record(self, record: FirmwareCommandRecord) -> None: + self.command_log_jsonl.parent.mkdir(parents=True, exist_ok=True) + with self.command_log_jsonl.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(record.to_dict(), sort_keys=True)) + handle.write("\n") + + def collect_firmware_context(self) -> list[dict]: + contexts = [] + for registration in self._registered_backends: + backend = registration["backend"] + head96_information = getattr(backend, "_head96_information", None) + contexts.append( + { + "label": registration["label"], + "backend_type": type(backend).__name__, + "pip_firmware_version": _to_jsonable(getattr(backend, "_pip_firmware_version", None)), + "head96_information": _to_jsonable(head96_information), + "machine_conf": _to_jsonable(getattr(backend, "_machine_conf", None)), + "extended_conf": _to_jsonable(getattr(backend, "_extended_conf", None)), + } + ) + return contexts + + async def best_effort_stop_backends(self) -> None: + for registration in reversed(self._registered_backends): + backend = registration["backend"] + stop_steps = [ + ("move_all_channels_in_z_safety", None), + ("move_core_96_to_safe_position", None), + ("park_iswap", None), + ("park_autoload", None), + ("stop", None), + ] + for method_name, argument in stop_steps: + method = getattr(backend, method_name, None) + if method is None: + continue + try: + result = method() if argument is None else method(argument) + if inspect.isawaitable(result): + await result + except Exception as exc: # pragma: no cover - cleanup is intentionally best effort + self.add_note(f"cleanup step {method_name} failed on {registration['label']}: {exc}") + + def restore_backend_hooks(self) -> None: + for registration in self._registered_backends: + original_send_command = registration["original_send_command"] + if registration["capture_commands"] and original_send_command is not None: + registration["backend"].send_command = original_send_command + + +def _load_script_module(script_path: pathlib.Path): + module_name = f"robot_loop_script_{uuid.uuid4().hex}" + spec = importlib.util.spec_from_file_location(module_name, script_path) + if spec is None or spec.loader is None: + raise RuntimeError(f"Unable to import runner script from {script_path}") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +async def _invoke_callable(func: typing.Callable, context: RobotJobContext): + signature = inspect.signature(func) + if len(signature.parameters) == 0: + result = func() + else: + result = func(context) + if inspect.isawaitable(result): + return await result + return result + + +async def execute_job(job: RobotRunJob) -> RobotRunResult: + job.artifact_dir.mkdir(parents=True, exist_ok=True) + job.result_json.parent.mkdir(parents=True, exist_ok=True) + job.raw_log.parent.mkdir(parents=True, exist_ok=True) + job.command_log_jsonl.parent.mkdir(parents=True, exist_ok=True) + + context = RobotJobContext( + run_id=job.run_id, + script_path=job.script, + artifact_dir=job.artifact_dir, + command_log_jsonl=job.command_log_jsonl, + operation=job.operation, + metadata=job.metadata, + ) + started_at = _utcnow() + status = "success" + exception_type = None + exception_message = None + traceback_text = None + cleanup_error = None + + with job.raw_log.open("a", encoding="utf-8") as raw_log_handle: + tee_stdout = _TeeTextIO(sys.stdout, raw_log_handle) + tee_stderr = _TeeTextIO(sys.stderr, raw_log_handle) + with contextlib.redirect_stdout(tee_stdout), contextlib.redirect_stderr(tee_stderr): + print(f"[robot-loop] run_id={job.run_id} operation={job.operation} script={job.script}") + cleanup_callable = None + + try: + module = _load_script_module(job.script) + run_callable = getattr(module, "run", None) + cleanup_callable = getattr(module, "cleanup", None) + if run_callable is None: + raise RuntimeError(f"Runner script {job.script} must define a callable named 'run'.") + await asyncio.wait_for(_invoke_callable(run_callable, context), timeout=job.timeout_seconds) + except asyncio.TimeoutError as exc: + status = "timeout" + exception_type = type(exc).__name__ + exception_message = f"runner timed out after {job.timeout_seconds} seconds" + traceback_text = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + except Exception as exc: # pylint: disable=broad-except + status = "failure" + exception_type = type(exc).__name__ + exception_message = str(exc) + traceback_text = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + finally: + try: + if cleanup_callable is not None: + await asyncio.wait_for( + _invoke_callable(cleanup_callable, context), timeout=job.cleanup_timeout_seconds + ) + except Exception as exc: # pylint: disable=broad-except + cleanup_error = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + try: + await asyncio.wait_for( + context.best_effort_stop_backends(), timeout=job.cleanup_timeout_seconds + ) + except Exception as exc: # pylint: disable=broad-except + suffix = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) + cleanup_error = suffix if cleanup_error is None else cleanup_error + "\n" + suffix + finally: + context.restore_backend_hooks() + + finished_at = _utcnow() + result = RobotRunResult( + run_id=job.run_id, + status=status, + script_path=str(job.script), + operation=job.operation, + started_at=started_at.isoformat(), + finished_at=finished_at.isoformat(), + timeout_seconds=job.timeout_seconds, + exception_type=exception_type, + exception_message=exception_message, + traceback=traceback_text, + notes=context.notes, + metadata=job.metadata, + firmware_context=context.collect_firmware_context(), + cleanup_error=cleanup_error, + artifacts={ + "artifact_dir": str(job.artifact_dir), + "result_json": str(job.result_json), + "raw_log": str(job.raw_log), + "command_log_jsonl": str(job.command_log_jsonl), + }, + ) + job.result_json.write_text(json.dumps(result.to_dict(), indent=2, sort_keys=True), encoding="utf-8") + return result + + +def load_job( + *, + script: typing.Optional[str] = None, + result_json: typing.Optional[str] = None, + raw_log: typing.Optional[str] = None, + command_log_jsonl: typing.Optional[str] = None, + artifact_dir: typing.Optional[str] = None, + operation: str = "unspecified", + timeout_seconds: float = 300.0, + cleanup_timeout_seconds: float = 30.0, + run_id: typing.Optional[str] = None, + metadata: typing.Optional[dict] = None, + job_json: typing.Optional[str] = None, +) -> RobotRunJob: + payload = {} + if job_json is not None: + payload = json.loads(pathlib.Path(job_json).read_text(encoding="utf-8")) + + script_path = pathlib.Path(script or payload["script"]).resolve() + artifact_root = pathlib.Path( + artifact_dir or payload.get("artifact_dir") or script_path.parent / "robot_loop_artifacts" + ).resolve() + resolved_run_id = run_id or payload.get("run_id") or uuid.uuid4().hex + + result_path = pathlib.Path( + result_json or payload.get("result_json") or artifact_root / f"{resolved_run_id}_result.json" + ).resolve() + raw_log_path = pathlib.Path( + raw_log or payload.get("raw_log") or artifact_root / f"{resolved_run_id}_raw.log" + ).resolve() + command_log_path = pathlib.Path( + command_log_jsonl + or payload.get("command_log_jsonl") + or artifact_root / f"{resolved_run_id}_commands.jsonl" + ).resolve() + + combined_metadata = {} + combined_metadata.update(payload.get("metadata", {})) + if metadata: + combined_metadata.update(metadata) + + return RobotRunJob( + script=script_path, + result_json=result_path, + raw_log=raw_log_path, + command_log_jsonl=command_log_path, + artifact_dir=artifact_root, + operation=payload.get("operation", operation), + timeout_seconds=float(payload.get("timeout_seconds", timeout_seconds)), + cleanup_timeout_seconds=float( + payload.get("cleanup_timeout_seconds", cleanup_timeout_seconds) + ), + run_id=resolved_run_id, + metadata=combined_metadata, + ) + + +def _parse_metadata(args_metadata: typing.Optional[str]) -> dict: + if args_metadata is None: + return {} + return typing.cast(dict, json.loads(args_metadata)) + + +def main(argv: typing.Optional[list[str]] = None) -> int: + parser = argparse.ArgumentParser(description="Run a local Codex hardware iteration job.") + parser.add_argument("--script", help="Path to the generated Python runner script.") + parser.add_argument("--job-json", help="Optional path to a JSON job specification.") + parser.add_argument("--result-json", help="Path where the structured result JSON should be written.") + parser.add_argument("--raw-log", help="Path where the raw stdout/stderr log should be written.") + parser.add_argument( + "--command-log-jsonl", + help="Path where captured backend firmware command records should be written.", + ) + parser.add_argument("--artifact-dir", help="Directory for run artifacts.") + parser.add_argument("--operation", default="unspecified", help="Operation category for this run.") + parser.add_argument("--timeout", type=float, default=300.0, help="Main run timeout in seconds.") + parser.add_argument( + "--cleanup-timeout", type=float, default=30.0, help="Cleanup timeout in seconds." + ) + parser.add_argument("--run-id", help="Optional run identifier.") + parser.add_argument("--metadata-json", help="Inline JSON metadata to include in the result.") + + args = parser.parse_args(argv) + metadata = _parse_metadata(args.metadata_json) + job = load_job( + script=args.script, + job_json=args.job_json, + result_json=args.result_json, + raw_log=args.raw_log, + command_log_jsonl=args.command_log_jsonl, + artifact_dir=args.artifact_dir, + operation=args.operation, + timeout_seconds=args.timeout, + cleanup_timeout_seconds=args.cleanup_timeout, + run_id=args.run_id, + metadata=metadata, + ) + result = asyncio.run(execute_job(job)) + print(json.dumps(result.to_dict(), indent=2, sort_keys=True)) + return 0 if result.status == "success" else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/robot_loop/runner_tests.py b/tools/robot_loop/runner_tests.py new file mode 100644 index 00000000000..65cfd84da17 --- /dev/null +++ b/tools/robot_loop/runner_tests.py @@ -0,0 +1,122 @@ +import json +import pathlib +import tempfile +import textwrap +import unittest + +from tools.robot_loop.runner import execute_job, load_job + + +class FakeBackend: + def __init__(self): + self._pip_firmware_version = "2009-05-01" + self._machine_conf = {"channels": 8} + self._extended_conf = {"iswap_installed": False} + self.cleanup_calls = [] + + async def send_command(self, module, command, **kwargs): + if command == "ERR": + raise RuntimeError("firmware rejected command") + return {"module": module, "command": command, "kwargs": kwargs} + + async def move_all_channels_in_z_safety(self): + self.cleanup_calls.append("move_all_channels_in_z_safety") + + async def stop(self): + self.cleanup_calls.append("stop") + + +class RobotLoopRunnerTests(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self._tempdir = tempfile.TemporaryDirectory() + self.addCleanup(self._tempdir.cleanup) + self.tmp_path = pathlib.Path(self._tempdir.name) + + def _write_script(self, directory: pathlib.Path, name: str, body: str) -> pathlib.Path: + script_path = directory / name + script_path.write_text(textwrap.dedent(body), encoding="utf-8") + return script_path + + async def test_execute_job_success_captures_firmware_context_and_commands(self): + script = self._write_script( + self.tmp_path, + "success_script.py", + """ + from tools.robot_loop.runner_tests import FakeBackend + + async def run(context): + backend = FakeBackend() + context.register_backend(backend, label="fake-star") + await backend.send_command("C0", "RF") + """, + ) + job = load_job(script=str(script), artifact_dir=str(self.tmp_path), run_id="success") + result = await execute_job(job) + + self.assertEqual(result.status, "success") + self.assertEqual(result.firmware_context[0]["pip_firmware_version"], "2009-05-01") + + commands = job.command_log_jsonl.read_text(encoding="utf-8").strip().splitlines() + self.assertEqual(len(commands), 1) + self.assertEqual(json.loads(commands[0])["command"], "RF") + + async def test_execute_job_failure_records_exception(self): + script = self._write_script( + self.tmp_path, + "failure_script.py", + """ + async def run(context): + raise ValueError("bad run") + """, + ) + job = load_job(script=str(script), artifact_dir=str(self.tmp_path), run_id="failure") + result = await execute_job(job) + + self.assertEqual(result.status, "failure") + self.assertEqual(result.exception_type, "ValueError") + self.assertIn("bad run", result.exception_message) + + async def test_execute_job_timeout_records_timeout_status(self): + script = self._write_script( + self.tmp_path, + "timeout_script.py", + """ + import asyncio + + async def run(context): + await asyncio.sleep(1) + """, + ) + job = load_job( + script=str(script), + artifact_dir=str(self.tmp_path), + run_id="timeout", + timeout_seconds=0.01, + ) + result = await execute_job(job) + + self.assertEqual(result.status, "timeout") + self.assertEqual(result.exception_type, "TimeoutError") + + async def test_execute_job_cleanup_runs_on_registered_backend(self): + script = self._write_script( + self.tmp_path, + "cleanup_script.py", + """ + from tools.robot_loop.runner_tests import FakeBackend + + backend = FakeBackend() + + async def run(context): + context.register_backend(backend, label="fake-star") + await backend.send_command("C0", "ERR") + """, + ) + job = load_job(script=str(script), artifact_dir=str(self.tmp_path), run_id="cleanup") + result = await execute_job(job) + + self.assertEqual(result.status, "failure") + command_record = json.loads(job.command_log_jsonl.read_text(encoding="utf-8").strip()) + self.assertEqual(command_record["error_type"], "RuntimeError") + raw_log = job.raw_log.read_text(encoding="utf-8") + self.assertIn("run_id=cleanup", raw_log)