From 2d4fa2cbb4db0031e91d6da786e5f0e132530b13 Mon Sep 17 00:00:00 2001 From: Harley King Date: Fri, 24 Apr 2026 11:44:23 -0400 Subject: [PATCH 1/3] minimal changes to STAR_backend to allow legacy firmware operation grippers work CORE8 successful, but resource min spacing error in troughplates --- .../backends/hamilton/STAR_backend.py | 155 ++++++++++++----- .../backends/hamilton/STAR_tests.py | 159 ++++++++++++++++++ 2 files changed, 276 insertions(+), 38 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 1545b399cb2..d953d7ce88f 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -1367,6 +1367,7 @@ def __init__( self._iswap_version: Optional[str] = None # loaded lazily self._pip_channel_information: Optional[List[PipChannelInformation]] = None + self._pip_firmware_version: Optional[datetime.date] = None self._default_1d_symbology: Barcode1DSymbology = "Code 128 (Subset B and C)" @@ -1678,6 +1679,52 @@ def _parse_firmware_version_datetime(self, fw_version: str) -> datetime.date: raise ValueError(f"Could not parse year from firmware version string: '{fw_version}'") return datetime.date(int(year_match.group(1)), 1, 1) + def _pip_has_old_firmware(self) -> bool: + """Return whether the installed PIP firmware predates 2010 compatibility changes.""" + pip_firmware_version = getattr(self, "_pip_firmware_version", None) + return pip_firmware_version is not None and pip_firmware_version.year < 2010 + + def _core96_has_old_firmware(self) -> bool: + """Return whether the installed CoRe96 firmware predates 2010 compatibility changes.""" + head96_information = getattr(self, "_head96_information", None) + return head96_information is not None and head96_information.fw_version.year < 2010 + + def _pip_supports_extended_tip_handling_command_params(self) -> bool: + """Return whether PIP tip-handling commands support newer optional parameters.""" + return not self._pip_has_old_firmware() + + def _pip_supports_extended_liquid_command_params(self) -> bool: + """Return whether PIP liquid commands support newer optional parameters.""" + return not self._pip_has_old_firmware() + + def _core96_supports_extended_liquid_command_params(self) -> bool: + """Return whether CoRe96 `EA`/`ED` support the newer optional parameter set.""" + return not self._core96_has_old_firmware() + + def _normalize_extended_configuration_y_bounds(self) -> None: + """Normalize impossible PIP Y bounds reported by some legacy STAR/STARlet firmware.""" + ext_conf = self.extended_conf + + if ( + ext_conf.pip_maximal_y_position < ext_conf.left_arm_min_y_position + or ext_conf.pip_maximal_y_position < 100.0 + ): + default_bounds = ExtendedConfiguration() + logger.warning( + "Normalizing invalid PIP Y bounds reported by firmware: max_y=%s, min_y=%s, pip_fw=%s", + ext_conf.pip_maximal_y_position, + ext_conf.left_arm_min_y_position, + self._pip_firmware_version, + ) + ext_conf.pip_maximal_y_position = default_bounds.pip_maximal_y_position + ext_conf.left_arm_min_y_position = default_bounds.left_arm_min_y_position + + def _pip_y_bounds(self) -> Tuple[float, float]: + """Return PIP Y bounds as a normalized ``(min_y, max_y)`` tuple.""" + min_y = self.extended_conf.left_arm_min_y_position + max_y = self.extended_conf.pip_maximal_y_position + return min(min_y, max_y), max(min_y, max_y) + async def setup( self, skip_instrument_initialization=False, @@ -1702,6 +1749,7 @@ async def setup( self._machine_conf = await self.request_machine_configuration() self._extended_conf = await self.request_extended_configuration() self._head96_information: Optional[Head96Information] = None + self._pip_firmware_version = None initialized = await self.request_instrument_initialization_status() @@ -1721,6 +1769,15 @@ async def setup( tip_presences = await self.request_tip_presence() self._num_channels = len(tip_presences) + pip_fw_response = await self.request_firmware_version() + pip_fw_match = re.search(r"rf(?P.+)$", pip_fw_response) + if pip_fw_match is None: + raise ValueError(f"Could not parse PIP firmware version from response: '{pip_fw_response}'") + self._pip_firmware_version = self._parse_firmware_version_datetime( + pip_fw_match.group("fw_version").strip() + ) + self._normalize_extended_configuration_y_bounds() + async def set_up_pip(): if (not initialized or any(tip_presences)) and not skip_pip: await self.initialize_pip() @@ -1852,12 +1909,13 @@ def can_reach_position(self, channel_idx: int, position: Coordinate) -> bool: # frontmost channel can go to y=6, every channel behind it constrains its min Y spacings = self._channels_minimum_y_spacing - min_y_pos = self.extended_conf.left_arm_min_y_position + sum(spacings[channel_idx + 1 :]) + lower_y_bound, upper_y_bound = self._pip_y_bounds() + min_y_pos = lower_y_bound + sum(spacings[channel_idx + 1 :]) if position.y < min_y_pos: return False # backmost channel max Y from config, every channel in front constrains its max Y - max_y_pos = self.extended_conf.pip_maximal_y_position - sum(spacings[:channel_idx]) + max_y_pos = upper_y_bound - sum(spacings[:channel_idx]) if position.y > max_y_pos: return False @@ -4614,7 +4672,7 @@ async def move_channel_y(self, channel: int, y: float): f"(channel {channel - 1} y-position is {round(y, 2)} mm)" ) else: - max_y_pos = self.extended_conf.pip_maximal_y_position + _, max_y_pos = self._pip_y_bounds() if y > max_y_pos: raise ValueError(f"channel {channel} y-target must be <= {max_y_pos} mm") @@ -4627,10 +4685,9 @@ async def move_channel_y(self, channel: int, y: float): ) else: # STAR machines do not allow channels y < minimum - if y < self.extended_conf.left_arm_min_y_position: - raise ValueError( - f"channel {channel} y-target must be >= {self.extended_conf.left_arm_min_y_position} mm" - ) + min_y_pos, _ = self._pip_y_bounds() + if y < min_y_pos: + raise ValueError(f"channel {channel} y-target must be >= {min_y_pos} mm") await self.position_single_pipetting_channel_in_y_direction( pipetting_channel_index=channel + 1, y_position=round(y * 10) @@ -5848,7 +5905,7 @@ async def initialize_pipetting_channels( assert 0 <= tip_type <= 99, "tip must be between 0 and 99" assert 0 <= discarding_method <= 1, "discarding_method must be between 0 and 1" - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="DI", read_timeout=120, @@ -5859,8 +5916,10 @@ async def initialize_pipetting_channels( te=f"{z_position_at_end_of_a_command:04}", tm=[f"{tm:01}" for tm in tip_pattern], tt=f"{tip_type:02}", - ti=discarding_method, ) + if self._pip_supports_extended_tip_handling_command_params(): + command_kwargs["ti"] = discarding_method + return await self.send_command(**command_kwargs) # -------------- 3.5.2 Tip handling commands using PIP -------------- @@ -5905,7 +5964,7 @@ async def pick_up_tip( "minimum_traverse_height_at_beginning_of_a_command must be between 0 and 3600" ) - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="TP", tip_pattern=tip_pattern, @@ -5917,8 +5976,10 @@ async def pick_up_tip( tp=f"{begin_tip_pick_up_process:04}", tz=f"{end_tip_pick_up_process:04}", th=f"{minimum_traverse_height_at_beginning_of_a_command:04}", - td=pickup_method.value, ) + if self._pip_supports_extended_tip_handling_command_params(): + command_kwargs["td"] = pickup_method.value + return await self.send_command(**command_kwargs) @need_iswap_parked async def discard_tip( @@ -5970,7 +6031,7 @@ async def discard_tip( "z_position_at_end_of_a_command must be between 0 and 3600" ) - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="TR", tip_pattern=tip_pattern, @@ -5982,8 +6043,10 @@ async def discard_tip( tz=end_tip_deposit_process, th=minimum_traverse_height_at_beginning_of_a_command, te=z_position_at_end_of_a_command, - ti=discarding_method.value, ) + if self._pip_supports_extended_tip_handling_command_params(): + command_kwargs["ti"] = discarding_method.value + return await self.send_command(**command_kwargs) # TODO:(command:TW) Tip Pick-up for DC wash procedure @@ -6229,7 +6292,7 @@ async def aspirate_pip( ) assert all(0 <= x <= 3600 for x in cup_upper_edge), "cup_upper_edge must be between 0 and 3600" - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="AS", tip_pattern=tip_pattern, @@ -6267,9 +6330,12 @@ async def aspirate_pip( mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface], ms=[f"{ms:04}" for ms in mix_speed], mh=[f"{mh:04}" for mh in mix_surface_following_distance], - gi=[f"{gi:03}" for gi in limit_curve_index], - gj=tadm_algorithm, - gk=recording_mode, + ) + if self._pip_supports_extended_liquid_command_params(): + command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index] + command_kwargs["gj"] = tadm_algorithm + command_kwargs["gk"] = recording_mode + command_kwargs.update( lk=[1 if lk else 0 for lk in use_2nd_section_aspiration], ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip], sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip], @@ -6277,6 +6343,7 @@ async def aspirate_pip( sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search], io=[f"{io:04}" for io in cup_upper_edge], ) + return await self.send_command(**command_kwargs) @need_iswap_parked async def dispense_pip( @@ -6461,7 +6528,7 @@ async def dispense_pip( ) assert 0 <= recording_mode <= 2, "recording_mode must be between 0 and 2" - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="DS", tip_pattern=tip_pattern, @@ -6499,10 +6566,12 @@ async def dispense_pip( mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface], ms=[f"{ms:04}" for ms in mix_speed], mh=[f"{mh:04}" for mh in mix_surface_following_distance], - gi=[f"{gi:03}" for gi in limit_curve_index], - gj=tadm_algorithm, # - gk=recording_mode, # ) + if self._pip_supports_extended_liquid_command_params(): + command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index] + command_kwargs["gj"] = tadm_algorithm + command_kwargs["gk"] = recording_mode + return await self.send_command(**command_kwargs) # TODO:(command:DA) Simultaneous aspiration & dispensation of liquid @@ -8061,7 +8130,7 @@ async def aspirate_core_96( channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern]) channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:] - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="EA", read_timeout=max(300, self.read_timeout), @@ -8073,7 +8142,10 @@ async def aspirate_core_96( ze=f"{min_z_endpos:04}", lz=f"{lld_search_height:04}", zt=f"{liquid_surface_no_lld:04}", - pp=f"{pull_out_distance_transport_air:04}", + ) + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}" + command_kwargs.update( zm=f"{minimum_height:04}", zv=f"{second_section_height:04}", zq=f"{second_section_ratio:05}", @@ -8094,11 +8166,13 @@ async def aspirate_core_96( hp=f"{mix_position_from_liquid_surface:03}", mj=f"{mix_surface_following_distance:03}", hs=f"{speed_of_mix:04}", - cw=channel_pattern_hex, - cr=f"{limit_curve_index:03}", - cj=tadm_algorithm, - cx=recording_mode, ) + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["cw"] = channel_pattern_hex + command_kwargs["cr"] = f"{limit_curve_index:03}" + command_kwargs["cj"] = tadm_algorithm + command_kwargs["cx"] = recording_mode + return await self.send_command(**command_kwargs) @need_iswap_parked @_requires_head96 @@ -8336,7 +8410,7 @@ async def dispense_core_96( channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern]) channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:] - return await self.send_command( + command_kwargs: Dict[str, Any] = dict( module="C0", command="ED", read_timeout=max(300, self.read_timeout), @@ -8349,7 +8423,10 @@ async def dispense_core_96( zq=f"{second_section_ratio:05}", lz=f"{lld_search_height:04}", zt=f"{liquid_surface_no_lld:04}", - pp=f"{pull_out_distance_transport_air:04}", + ) + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}" + command_kwargs.update( iw=f"{immersion_depth:03}", ix=immersion_depth_direction, fh=f"{surface_following_distance:03}", @@ -8371,11 +8448,13 @@ async def dispense_core_96( hp=f"{mix_position_from_liquid_surface:03}", mj=f"{mix_surface_following_distance:03}", hs=f"{speed_of_mixing:04}", - cw=channel_pattern_hex, - cr=f"{limit_curve_index:03}", - cj=tadm_algorithm, - cx=recording_mode, ) + if self._core96_supports_extended_liquid_command_params(): + command_kwargs["cw"] = channel_pattern_hex + command_kwargs["cr"] = f"{limit_curve_index:03}" + command_kwargs["cj"] = tadm_algorithm + command_kwargs["cx"] = recording_mode + return await self.send_command(**command_kwargs) # -------------- 3.10.4 Adjustment & movement commands -------------- @@ -10707,13 +10786,13 @@ async def clld_probe_y_position_using_channel( adj_upper_y = await self.request_y_pos_channel_n(channel_idx - 1) max_safe_upper_y_pos = adj_upper_y - self._min_spacing_between(channel_idx, channel_idx - 1) else: - max_safe_upper_y_pos = self.extended_conf.pip_maximal_y_position + _, max_safe_upper_y_pos = self._pip_y_bounds() if channel_idx < (self.num_channels - 1): adj_lower_y = await self.request_y_pos_channel_n(channel_idx + 1) max_safe_lower_y_pos = adj_lower_y + self._min_spacing_between(channel_idx, channel_idx + 1) else: - max_safe_lower_y_pos = self.extended_conf.left_arm_min_y_position + max_safe_lower_y_pos, _ = self._pip_y_bounds() # Enable safe start and end positions if start_pos_search: @@ -10794,7 +10873,7 @@ async def clld_probe_y_position_using_channel( channel_idx, channel_idx + 1 ) else: - min_y = self.extended_conf.left_arm_min_y_position + min_y, _ = self._pip_y_bounds() max_safe_dist = detected_material_y_pos - min_y move_target = detected_material_y_pos - min(post_detection_dist, max_safe_dist) @@ -10805,7 +10884,7 @@ async def clld_probe_y_position_using_channel( channel_idx, channel_idx - 1 ) else: - max_y = self.extended_conf.pip_maximal_y_position + _, max_y = self._pip_y_bounds() max_safe_dist = max_y - detected_material_y_pos move_target = detected_material_y_pos + min(post_detection_dist, max_safe_dist) @@ -11682,7 +11761,7 @@ async def get_channels_y_positions(self) -> Dict[int, float]: # position_channels_in_y_direction, it will raise an error.) The minimum y is 6mm, # so we fix that first (in case that value is misreported). Then, we traverse the # list in reverse and enforce pairwise minimum spacing. - min_y = self.extended_conf.left_arm_min_y_position + min_y, _ = self._pip_y_bounds() if y_positions[-1] < min_y - 0.2: raise RuntimeError( "Channels are reported to be too close to the front of the machine. " diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index b478ff7b637..ddff809cffd 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -1,5 +1,6 @@ # mypy: disable-error-code="attr-defined,method-assign" +import datetime import unittest import unittest.mock from typing import Literal, cast @@ -32,6 +33,7 @@ from .STAR_backend import ( CommandSyntaxError, + ExtendedConfiguration, HamiltonNoTipError, HardwareError, STARBackend, @@ -202,6 +204,109 @@ async def stop(self): self.stop_finished = True +class TestSTARLegacyFirmwareCommandParams(unittest.IsolatedAsyncioTestCase): + """Test old-firmware command payload compatibility gates.""" + + def _backend(self) -> STARBackend: + backend = STARBackend() + backend._extended_conf = _DEFAULT_EXTENDED_CONFIGURATION + backend._iswap_parked = True + backend.send_command = unittest.mock.AsyncMock() # type: ignore[method-assign] + return backend + + async def test_legacy_pip_firmware_omits_newer_optional_params(self): + backend = self._backend() + backend._pip_firmware_version = datetime.date(2009, 5, 4) + + await backend.initialize_pipetting_channels(tip_pattern=[True]) + self.assertNotIn("ti", backend.send_command.call_args.kwargs) + backend.send_command.reset_mock() + + await backend.pick_up_tip( + x_positions=[0], y_positions=[0], tip_pattern=[True], tip_type_idx=1 + ) + self.assertNotIn("td", backend.send_command.call_args.kwargs) + backend.send_command.reset_mock() + + await backend.discard_tip(x_positions=[0], y_positions=[0], tip_pattern=[True]) + self.assertNotIn("ti", backend.send_command.call_args.kwargs) + backend.send_command.reset_mock() + + await backend.aspirate_pip() + kwargs = backend.send_command.call_args.kwargs + self.assertNotIn("gi", kwargs) + self.assertNotIn("gj", kwargs) + self.assertNotIn("gk", kwargs) + backend.send_command.reset_mock() + + await backend.dispense_pip(tip_pattern=[True]) + kwargs = backend.send_command.call_args.kwargs + self.assertNotIn("gi", kwargs) + self.assertNotIn("gj", kwargs) + self.assertNotIn("gk", kwargs) + + async def test_modern_pip_firmware_keeps_newer_optional_params(self): + backend = self._backend() + backend._pip_firmware_version = datetime.date(2010, 1, 1) + + await backend.initialize_pipetting_channels(tip_pattern=[True]) + self.assertIn("ti", backend.send_command.call_args.kwargs) + backend.send_command.reset_mock() + + await backend.pick_up_tip( + x_positions=[0], y_positions=[0], tip_pattern=[True], tip_type_idx=1 + ) + self.assertIn("td", backend.send_command.call_args.kwargs) + backend.send_command.reset_mock() + + await backend.discard_tip(x_positions=[0], y_positions=[0], tip_pattern=[True]) + self.assertIn("ti", backend.send_command.call_args.kwargs) + backend.send_command.reset_mock() + + await backend.aspirate_pip() + kwargs = backend.send_command.call_args.kwargs + self.assertIn("gi", kwargs) + self.assertIn("gj", kwargs) + self.assertIn("gk", kwargs) + backend.send_command.reset_mock() + + await backend.dispense_pip(tip_pattern=[True]) + kwargs = backend.send_command.call_args.kwargs + self.assertIn("gi", kwargs) + self.assertIn("gj", kwargs) + self.assertIn("gk", kwargs) + + async def test_legacy_core96_firmware_omits_newer_optional_params(self): + backend = self._backend() + backend._head96_information = unittest.mock.Mock(fw_version=datetime.date(2009, 5, 4)) + + await backend.aspirate_core_96(y_positions=1080) + kwargs = backend.send_command.call_args.kwargs + for param in ("pp", "cw", "cr", "cj", "cx"): + self.assertNotIn(param, kwargs) + backend.send_command.reset_mock() + + await backend.dispense_core_96(y_position=1080) + kwargs = backend.send_command.call_args.kwargs + for param in ("pp", "cw", "cr", "cj", "cx"): + self.assertNotIn(param, kwargs) + + async def test_modern_core96_firmware_keeps_newer_optional_params(self): + backend = self._backend() + backend._head96_information = unittest.mock.Mock(fw_version=datetime.date(2010, 1, 1)) + + await backend.aspirate_core_96(y_positions=1080) + kwargs = backend.send_command.call_args.kwargs + for param in ("pp", "cw", "cr", "cj", "cx"): + self.assertIn(param, kwargs) + backend.send_command.reset_mock() + + await backend.dispense_core_96(y_position=1080) + kwargs = backend.send_command.call_args.kwargs + for param in ("pp", "cw", "cr", "cj", "cx"): + self.assertIn(param, kwargs) + + class TestSTARLiquidHandlerCommands(unittest.IsolatedAsyncioTestCase): """Test STAR backend for liquid handling.""" @@ -1540,6 +1645,60 @@ class TestChannelsMinimumYSpacing(unittest.IsolatedAsyncioTestCase): # -- can_reach_position: reachability shrinks with wider spacing ---------------- + def test_normalize_extended_configuration_y_bounds_replaces_reversed_bounds(self): + backend = STARBackend() + backend._extended_conf = ExtendedConfiguration() + backend._extended_conf.left_arm_min_y_position = 606.5 + backend._extended_conf.pip_maximal_y_position = 6.0 + backend._pip_firmware_version = datetime.date(2009, 5, 4) + + backend._normalize_extended_configuration_y_bounds() + + default_bounds = ExtendedConfiguration() + self.assertEqual( + backend.extended_conf.left_arm_min_y_position, + default_bounds.left_arm_min_y_position, + ) + self.assertEqual( + backend.extended_conf.pip_maximal_y_position, + default_bounds.pip_maximal_y_position, + ) + + def test_normalize_extended_configuration_y_bounds_replaces_tiny_max_y(self): + backend = STARBackend() + backend._num_channels = 8 + backend._channels_minimum_y_spacing = [9.0] * 8 + backend._extended_conf = ExtendedConfiguration() + backend._extended_conf.left_arm_min_y_position = 6.0 + backend._extended_conf.pip_maximal_y_position = 50.0 + backend._pip_firmware_version = datetime.date(2009, 5, 4) + + self.assertFalse(backend.can_reach_position(0, Coordinate(100, 100, 100))) + backend._normalize_extended_configuration_y_bounds() + self.assertTrue(backend.can_reach_position(0, Coordinate(100, 100, 100))) + + def test_normalize_extended_configuration_y_bounds_leaves_valid_bounds(self): + backend = STARBackend() + backend._extended_conf = ExtendedConfiguration() + backend._extended_conf.left_arm_min_y_position = 10.0 + backend._extended_conf.pip_maximal_y_position = 500.0 + backend._pip_firmware_version = datetime.date(2021, 1, 1) + + backend._normalize_extended_configuration_y_bounds() + + self.assertEqual(backend.extended_conf.left_arm_min_y_position, 10.0) + self.assertEqual(backend.extended_conf.pip_maximal_y_position, 500.0) + + def test_can_reach_position_uses_sorted_y_bounds(self): + backend = STARBackend() + backend._num_channels = 8 + backend._channels_minimum_y_spacing = [9.0] * 8 + backend._extended_conf = ExtendedConfiguration() + backend._extended_conf.left_arm_min_y_position = 606.5 + backend._extended_conf.pip_maximal_y_position = 6.0 + + self.assertTrue(backend.can_reach_position(0, Coordinate(100, 100, 100))) + async def test_can_reach_4ch_18mm_rejects_position_reachable_at_9mm(self): """A position reachable by channel 0 at 9mm spacing is unreachable at 18mm spacing. From 8c66d05f28746a8a92dbebc125f3d34165826c6d Mon Sep 17 00:00:00 2001 From: Harley King Date: Fri, 24 Apr 2026 13:19:04 -0400 Subject: [PATCH 2/3] H0DQ on CORE96 removed and parameter po on the PIP aspirate command AS It's a game of whack-a-mole determining which parameters are accepted by the legacy firmware and which commands are not. H0DQ is a pre-pickup dispensing-drive move. Modified so that it only runs on firmware that supports it. po gating applied with aspirate and dispense. legacy firmware command tests to assert legacy PIP AS omits po, gi, gj, gk, lk, ik, sd, se, sz, and io. --- .../backends/hamilton/STAR_backend.py | 22 +++++++----- .../backends/hamilton/STAR_tests.py | 36 ++++++++++++------- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index d953d7ce88f..1a77caa9963 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -3526,7 +3526,7 @@ async def pick_up_tips96( self._check_96_position_legal(pickup_position, skip_z=True) - if tip_pickup_method == "from_rack": + if tip_pickup_method == "from_rack" and self._core96_supports_extended_liquid_command_params(): # the STAR will not automatically move the dispensing drive down if it is still up # so we need to move it down here # see https://github.com/PyLabRobot/pylabrobot/pull/835 @@ -6335,14 +6335,16 @@ async def aspirate_pip( command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index] command_kwargs["gj"] = tadm_algorithm command_kwargs["gk"] = recording_mode - command_kwargs.update( - lk=[1 if lk else 0 for lk in use_2nd_section_aspiration], - ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip], - sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip], - se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search], - sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search], - io=[f"{io:04}" for io in cup_upper_edge], - ) + command_kwargs.update( + lk=[1 if lk else 0 for lk in use_2nd_section_aspiration], + ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip], + sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip], + se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search], + sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search], + io=[f"{io:04}" for io in cup_upper_edge], + ) + else: + command_kwargs.pop("po") return await self.send_command(**command_kwargs) @need_iswap_parked @@ -6571,6 +6573,8 @@ async def dispense_pip( command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index] command_kwargs["gj"] = tadm_algorithm command_kwargs["gk"] = recording_mode + else: + command_kwargs.pop("po") return await self.send_command(**command_kwargs) # TODO:(command:DA) Simultaneous aspiration & dispensation of liquid diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index ddff809cffd..cb46c925370 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -234,16 +234,14 @@ async def test_legacy_pip_firmware_omits_newer_optional_params(self): await backend.aspirate_pip() kwargs = backend.send_command.call_args.kwargs - self.assertNotIn("gi", kwargs) - self.assertNotIn("gj", kwargs) - self.assertNotIn("gk", kwargs) + for param in ("po", "gi", "gj", "gk", "lk", "ik", "sd", "se", "sz", "io"): + self.assertNotIn(param, kwargs) backend.send_command.reset_mock() await backend.dispense_pip(tip_pattern=[True]) kwargs = backend.send_command.call_args.kwargs - self.assertNotIn("gi", kwargs) - self.assertNotIn("gj", kwargs) - self.assertNotIn("gk", kwargs) + for param in ("po", "gi", "gj", "gk"): + self.assertNotIn(param, kwargs) async def test_modern_pip_firmware_keeps_newer_optional_params(self): backend = self._backend() @@ -265,16 +263,14 @@ async def test_modern_pip_firmware_keeps_newer_optional_params(self): await backend.aspirate_pip() kwargs = backend.send_command.call_args.kwargs - self.assertIn("gi", kwargs) - self.assertIn("gj", kwargs) - self.assertIn("gk", kwargs) + for param in ("po", "gi", "gj", "gk", "lk", "ik", "sd", "se", "sz", "io"): + self.assertIn(param, kwargs) backend.send_command.reset_mock() await backend.dispense_pip(tip_pattern=[True]) kwargs = backend.send_command.call_args.kwargs - self.assertIn("gi", kwargs) - self.assertIn("gj", kwargs) - self.assertIn("gk", kwargs) + for param in ("po", "gi", "gj", "gk"): + self.assertIn(param, kwargs) async def test_legacy_core96_firmware_omits_newer_optional_params(self): backend = self._backend() @@ -793,6 +789,22 @@ async def test_core_96_tip_pickup(self): ] ) + async def test_legacy_core_96_tip_pickup_skips_dispensing_drive_move(self): + self.STAR._head96_information = unittest.mock.Mock(fw_version=datetime.date(2009, 5, 4)) + + await self.lh.pick_up_tips96(self.tip_rack) + + self.STAR._write_and_read_command.assert_has_calls( + [ + _any_write_and_read_command_call("C0TTid0001tt01tf1tl0519tv03600tg2tu0"), + _any_write_and_read_command_call("C0EPid0002xs01179xd0yh2418tt01wu0za2164zh2450ze2450"), + ] + ) + sent_commands = [ + call.kwargs["cmd"] for call in self.STAR._write_and_read_command.call_args_list + ] + self.assertFalse(any(command.startswith("H0DQ") for command in sent_commands)) + async def test_tip_tracking_pick_up96(self): set_tip_tracking(enabled=True) await self.lh.pick_up_tips96(self.tip_rack) From 2af7009e6c1d47f2f98c475842e766307418c62a Mon Sep 17 00:00:00 2001 From: Harley King Date: Fri, 24 Apr 2026 14:06:24 -0400 Subject: [PATCH 3/3] format correction --- .../liquid_handling/backends/hamilton/STAR_tests.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py index cb46c925370..6a6a10565c9 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py @@ -222,9 +222,7 @@ async def test_legacy_pip_firmware_omits_newer_optional_params(self): self.assertNotIn("ti", backend.send_command.call_args.kwargs) backend.send_command.reset_mock() - await backend.pick_up_tip( - x_positions=[0], y_positions=[0], tip_pattern=[True], tip_type_idx=1 - ) + await backend.pick_up_tip(x_positions=[0], y_positions=[0], tip_pattern=[True], tip_type_idx=1) self.assertNotIn("td", backend.send_command.call_args.kwargs) backend.send_command.reset_mock() @@ -251,9 +249,7 @@ async def test_modern_pip_firmware_keeps_newer_optional_params(self): self.assertIn("ti", backend.send_command.call_args.kwargs) backend.send_command.reset_mock() - await backend.pick_up_tip( - x_positions=[0], y_positions=[0], tip_pattern=[True], tip_type_idx=1 - ) + await backend.pick_up_tip(x_positions=[0], y_positions=[0], tip_pattern=[True], tip_type_idx=1) self.assertIn("td", backend.send_command.call_args.kwargs) backend.send_command.reset_mock()