Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 128 additions & 45 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ def __init__(

self._iswap_version: Optional[str] = None # loaded lazily
self._pip_channel_information: Optional[List[PipChannelInformation]] = None
self._pip_firmware_version: Optional[datetime.date] = None

self._default_1d_symbology: Barcode1DSymbology = "Code 128 (Subset B and C)"

Expand Down Expand Up @@ -1678,6 +1679,52 @@ def _parse_firmware_version_datetime(self, fw_version: str) -> datetime.date:
raise ValueError(f"Could not parse year from firmware version string: '{fw_version}'")
return datetime.date(int(year_match.group(1)), 1, 1)

def _pip_has_old_firmware(self) -> bool:
"""Return whether the installed PIP firmware predates 2010 compatibility changes."""
pip_firmware_version = getattr(self, "_pip_firmware_version", None)
return pip_firmware_version is not None and pip_firmware_version.year < 2010

def _core96_has_old_firmware(self) -> bool:
"""Return whether the installed CoRe96 firmware predates 2010 compatibility changes."""
head96_information = getattr(self, "_head96_information", None)
return head96_information is not None and head96_information.fw_version.year < 2010

def _pip_supports_extended_tip_handling_command_params(self) -> bool:
"""Return whether PIP tip-handling commands support newer optional parameters."""
return not self._pip_has_old_firmware()

def _pip_supports_extended_liquid_command_params(self) -> bool:
"""Return whether PIP liquid commands support newer optional parameters."""
return not self._pip_has_old_firmware()

def _core96_supports_extended_liquid_command_params(self) -> bool:
"""Return whether CoRe96 `EA`/`ED` support the newer optional parameter set."""
return not self._core96_has_old_firmware()

def _normalize_extended_configuration_y_bounds(self) -> None:
"""Normalize impossible PIP Y bounds reported by some legacy STAR/STARlet firmware."""
ext_conf = self.extended_conf

if (
ext_conf.pip_maximal_y_position < ext_conf.left_arm_min_y_position
or ext_conf.pip_maximal_y_position < 100.0
):
default_bounds = ExtendedConfiguration()
logger.warning(
"Normalizing invalid PIP Y bounds reported by firmware: max_y=%s, min_y=%s, pip_fw=%s",
ext_conf.pip_maximal_y_position,
ext_conf.left_arm_min_y_position,
self._pip_firmware_version,
)
ext_conf.pip_maximal_y_position = default_bounds.pip_maximal_y_position
ext_conf.left_arm_min_y_position = default_bounds.left_arm_min_y_position

def _pip_y_bounds(self) -> Tuple[float, float]:
"""Return PIP Y bounds as a normalized ``(min_y, max_y)`` tuple."""
min_y = self.extended_conf.left_arm_min_y_position
max_y = self.extended_conf.pip_maximal_y_position
return min(min_y, max_y), max(min_y, max_y)
Comment on lines +1721 to +1726
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would the max y be lower than min y? i do not understand why they are being compared here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would the max y be lower than min y? i do not understand why they are being compared here

Normally it should not be. This is handling a legacy STARlet firmware quirk we observed on hardware: the extended configuration can report invalid/reversed PIP Y bounds, for example left_arm_min_y_position = 606.5 and pip_maximal_y_position = 6.0.

That bad interval made ensure_can_reach_position() reject valid CORE8 positions during tip pickup. _normalize_extended_configuration_y_bounds() fixes the known invalid values during setup, and _pip_y_bounds() defensively returns the bounds as an ordered (min_y, max_y) tuple for reachability/safety call sites.

So the comparison is not because max is expected to be lower than min on valid firmware; it is there to tolerate bad legacy firmware-reported bounds without disabling the reachability guard entirely.

Copy link
Copy Markdown
Member

@rickwierenga rickwierenga Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that the only problem or are the channels actually reversed as originally reported? does higher y still mean towards the back of the robot? is channel 0 still the backmost channel?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran a hardware confirmation script on the legacy STARlet to answer the channel/Y-axis questions directly.

This PR does not reverse channel order or change the deck coordinate model. After setup, the legacy firmware-reported PIP Y bounds were normalized to:

  • left_arm_min_y_position: 6.00 mm
  • pip_maximal_y_position: 606.50 mm
  • normalized interval: (6.00, 606.50) mm

So the change is only correcting invalid firmware-reported bounds before reachability checks.

For CORE8 channel mapping, PLR mapped tip_rack_1000["A8:H8"] with use_channels=[0..7] as:

  • channel 0 -> A8, Y 337.8 mm
  • channel 1 -> B8, Y 328.8 mm
  • channel 2 -> C8, Y 319.8 mm
  • channel 3 -> D8, Y 310.8 mm
  • channel 4 -> E8, Y 301.8 mm
  • channel 5 -> F8, Y 292.8 mm
  • channel 6 -> G8, Y 283.8 mm
  • channel 7 -> H8, Y 274.8 mm

This confirms:

  1. channels are not reversed
  2. higher Y is still treated as toward the back of the robot
  3. channel 0 remains the backmost channel

I also checked that the reachability guard was not bypassed. Valid A8:H8 pickup targets all returned reachable=True, while deliberately invalid targets still failed:

  • channel 0 at y=-20.0 mm: reachable=False
  • channel 7 at y=800.0 mm: reachable=False

So ensure_can_reach_position() remains active. This PR fixes bad legacy firmware inputs to the reachability logic; it does not suppress the guard or change channel numbering/direction.


async def setup(
self,
skip_instrument_initialization=False,
Expand All @@ -1702,6 +1749,7 @@ async def setup(
self._machine_conf = await self.request_machine_configuration()
self._extended_conf = await self.request_extended_configuration()
self._head96_information: Optional[Head96Information] = None
self._pip_firmware_version = None

initialized = await self.request_instrument_initialization_status()

Expand All @@ -1721,6 +1769,15 @@ async def setup(
tip_presences = await self.request_tip_presence()
self._num_channels = len(tip_presences)

pip_fw_response = await self.request_firmware_version()
pip_fw_match = re.search(r"rf(?P<fw_version>.+)$", pip_fw_response)
if pip_fw_match is None:
raise ValueError(f"Could not parse PIP firmware version from response: '{pip_fw_response}'")
self._pip_firmware_version = self._parse_firmware_version_datetime(
pip_fw_match.group("fw_version").strip()
)
self._normalize_extended_configuration_y_bounds()

async def set_up_pip():
if (not initialized or any(tip_presences)) and not skip_pip:
await self.initialize_pip()
Expand Down Expand Up @@ -1852,12 +1909,13 @@ def can_reach_position(self, channel_idx: int, position: Coordinate) -> bool:

# frontmost channel can go to y=6, every channel behind it constrains its min Y
spacings = self._channels_minimum_y_spacing
min_y_pos = self.extended_conf.left_arm_min_y_position + sum(spacings[channel_idx + 1 :])
lower_y_bound, upper_y_bound = self._pip_y_bounds()
min_y_pos = lower_y_bound + sum(spacings[channel_idx + 1 :])
if position.y < min_y_pos:
return False

# backmost channel max Y from config, every channel in front constrains its max Y
max_y_pos = self.extended_conf.pip_maximal_y_position - sum(spacings[:channel_idx])
max_y_pos = upper_y_bound - sum(spacings[:channel_idx])
if position.y > max_y_pos:
return False

Expand Down Expand Up @@ -3468,7 +3526,7 @@ async def pick_up_tips96(

self._check_96_position_legal(pickup_position, skip_z=True)

if tip_pickup_method == "from_rack":
if tip_pickup_method == "from_rack" and self._core96_supports_extended_liquid_command_params():
# the STAR will not automatically move the dispensing drive down if it is still up
# so we need to move it down here
# see https://github.com/PyLabRobot/pylabrobot/pull/835
Expand Down Expand Up @@ -4614,7 +4672,7 @@ async def move_channel_y(self, channel: int, y: float):
f"(channel {channel - 1} y-position is {round(y, 2)} mm)"
)
else:
max_y_pos = self.extended_conf.pip_maximal_y_position
_, max_y_pos = self._pip_y_bounds()
if y > max_y_pos:
raise ValueError(f"channel {channel} y-target must be <= {max_y_pos} mm")

Expand All @@ -4627,10 +4685,9 @@ async def move_channel_y(self, channel: int, y: float):
)
else:
# STAR machines do not allow channels y < minimum
if y < self.extended_conf.left_arm_min_y_position:
raise ValueError(
f"channel {channel} y-target must be >= {self.extended_conf.left_arm_min_y_position} mm"
)
min_y_pos, _ = self._pip_y_bounds()
if y < min_y_pos:
raise ValueError(f"channel {channel} y-target must be >= {min_y_pos} mm")

await self.position_single_pipetting_channel_in_y_direction(
pipetting_channel_index=channel + 1, y_position=round(y * 10)
Expand Down Expand Up @@ -5848,7 +5905,7 @@ async def initialize_pipetting_channels(
assert 0 <= tip_type <= 99, "tip must be between 0 and 99"
assert 0 <= discarding_method <= 1, "discarding_method must be between 0 and 1"

return await self.send_command(
command_kwargs: Dict[str, Any] = dict(
module="C0",
command="DI",
read_timeout=120,
Expand All @@ -5859,8 +5916,10 @@ async def initialize_pipetting_channels(
te=f"{z_position_at_end_of_a_command:04}",
tm=[f"{tm:01}" for tm in tip_pattern],
tt=f"{tip_type:02}",
ti=discarding_method,
)
if self._pip_supports_extended_tip_handling_command_params():
command_kwargs["ti"] = discarding_method
return await self.send_command(**command_kwargs)

# -------------- 3.5.2 Tip handling commands using PIP --------------

Expand Down Expand Up @@ -5905,7 +5964,7 @@ async def pick_up_tip(
"minimum_traverse_height_at_beginning_of_a_command must be between 0 and 3600"
)

return await self.send_command(
command_kwargs: Dict[str, Any] = dict(
module="C0",
command="TP",
tip_pattern=tip_pattern,
Expand All @@ -5917,8 +5976,10 @@ async def pick_up_tip(
tp=f"{begin_tip_pick_up_process:04}",
tz=f"{end_tip_pick_up_process:04}",
th=f"{minimum_traverse_height_at_beginning_of_a_command:04}",
td=pickup_method.value,
)
if self._pip_supports_extended_tip_handling_command_params():
command_kwargs["td"] = pickup_method.value
return await self.send_command(**command_kwargs)

@need_iswap_parked
async def discard_tip(
Expand Down Expand Up @@ -5970,7 +6031,7 @@ async def discard_tip(
"z_position_at_end_of_a_command must be between 0 and 3600"
)

return await self.send_command(
command_kwargs: Dict[str, Any] = dict(
module="C0",
command="TR",
tip_pattern=tip_pattern,
Expand All @@ -5982,8 +6043,10 @@ async def discard_tip(
tz=end_tip_deposit_process,
th=minimum_traverse_height_at_beginning_of_a_command,
te=z_position_at_end_of_a_command,
ti=discarding_method.value,
)
if self._pip_supports_extended_tip_handling_command_params():
command_kwargs["ti"] = discarding_method.value
return await self.send_command(**command_kwargs)

# TODO:(command:TW) Tip Pick-up for DC wash procedure

Expand Down Expand Up @@ -6229,7 +6292,7 @@ async def aspirate_pip(
)
assert all(0 <= x <= 3600 for x in cup_upper_edge), "cup_upper_edge must be between 0 and 3600"

return await self.send_command(
command_kwargs: Dict[str, Any] = dict(
module="C0",
command="AS",
tip_pattern=tip_pattern,
Expand Down Expand Up @@ -6267,16 +6330,22 @@ async def aspirate_pip(
mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface],
ms=[f"{ms:04}" for ms in mix_speed],
mh=[f"{mh:04}" for mh in mix_surface_following_distance],
gi=[f"{gi:03}" for gi in limit_curve_index],
gj=tadm_algorithm,
gk=recording_mode,
lk=[1 if lk else 0 for lk in use_2nd_section_aspiration],
ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip],
sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip],
se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search],
sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search],
io=[f"{io:04}" for io in cup_upper_edge],
)
if self._pip_supports_extended_liquid_command_params():
command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index]
command_kwargs["gj"] = tadm_algorithm
command_kwargs["gk"] = recording_mode
command_kwargs.update(
lk=[1 if lk else 0 for lk in use_2nd_section_aspiration],
ik=[f"{ik:04}" for ik in retract_height_over_2nd_section_to_empty_tip],
sd=[f"{sd:04}" for sd in dispensation_speed_during_emptying_tip],
se=[f"{se:04}" for se in dosing_drive_speed_during_2nd_section_search],
sz=[f"{sz:04}" for sz in z_drive_speed_during_2nd_section_search],
io=[f"{io:04}" for io in cup_upper_edge],
)
else:
command_kwargs.pop("po")
return await self.send_command(**command_kwargs)

@need_iswap_parked
async def dispense_pip(
Expand Down Expand Up @@ -6461,7 +6530,7 @@ async def dispense_pip(
)
assert 0 <= recording_mode <= 2, "recording_mode must be between 0 and 2"

return await self.send_command(
command_kwargs: Dict[str, Any] = dict(
module="C0",
command="DS",
tip_pattern=tip_pattern,
Expand Down Expand Up @@ -6499,10 +6568,14 @@ async def dispense_pip(
mp=[f"{mp:03}" for mp in mix_position_from_liquid_surface],
ms=[f"{ms:04}" for ms in mix_speed],
mh=[f"{mh:04}" for mh in mix_surface_following_distance],
gi=[f"{gi:03}" for gi in limit_curve_index],
gj=tadm_algorithm, #
gk=recording_mode, #
)
if self._pip_supports_extended_liquid_command_params():
command_kwargs["gi"] = [f"{gi:03}" for gi in limit_curve_index]
command_kwargs["gj"] = tadm_algorithm
command_kwargs["gk"] = recording_mode
else:
command_kwargs.pop("po")
return await self.send_command(**command_kwargs)

# TODO:(command:DA) Simultaneous aspiration & dispensation of liquid

Expand Down Expand Up @@ -8061,7 +8134,7 @@ async def aspirate_core_96(
channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern])
channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:]

return await self.send_command(
command_kwargs: Dict[str, Any] = dict(
module="C0",
command="EA",
read_timeout=max(300, self.read_timeout),
Expand All @@ -8073,7 +8146,10 @@ async def aspirate_core_96(
ze=f"{min_z_endpos:04}",
lz=f"{lld_search_height:04}",
zt=f"{liquid_surface_no_lld:04}",
pp=f"{pull_out_distance_transport_air:04}",
)
if self._core96_supports_extended_liquid_command_params():
command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}"
command_kwargs.update(
zm=f"{minimum_height:04}",
zv=f"{second_section_height:04}",
zq=f"{second_section_ratio:05}",
Expand All @@ -8094,11 +8170,13 @@ async def aspirate_core_96(
hp=f"{mix_position_from_liquid_surface:03}",
mj=f"{mix_surface_following_distance:03}",
hs=f"{speed_of_mix:04}",
cw=channel_pattern_hex,
cr=f"{limit_curve_index:03}",
cj=tadm_algorithm,
cx=recording_mode,
)
if self._core96_supports_extended_liquid_command_params():
command_kwargs["cw"] = channel_pattern_hex
command_kwargs["cr"] = f"{limit_curve_index:03}"
command_kwargs["cj"] = tadm_algorithm
command_kwargs["cx"] = recording_mode
return await self.send_command(**command_kwargs)

@need_iswap_parked
@_requires_head96
Expand Down Expand Up @@ -8336,7 +8414,7 @@ async def dispense_core_96(
channel_pattern_bin_str = reversed(["1" if x else "0" for x in channel_pattern])
channel_pattern_hex = hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:]

return await self.send_command(
command_kwargs: Dict[str, Any] = dict(
module="C0",
command="ED",
read_timeout=max(300, self.read_timeout),
Expand All @@ -8349,7 +8427,10 @@ async def dispense_core_96(
zq=f"{second_section_ratio:05}",
lz=f"{lld_search_height:04}",
zt=f"{liquid_surface_no_lld:04}",
pp=f"{pull_out_distance_transport_air:04}",
)
if self._core96_supports_extended_liquid_command_params():
command_kwargs["pp"] = f"{pull_out_distance_transport_air:04}"
command_kwargs.update(
iw=f"{immersion_depth:03}",
ix=immersion_depth_direction,
fh=f"{surface_following_distance:03}",
Expand All @@ -8371,11 +8452,13 @@ async def dispense_core_96(
hp=f"{mix_position_from_liquid_surface:03}",
mj=f"{mix_surface_following_distance:03}",
hs=f"{speed_of_mixing:04}",
cw=channel_pattern_hex,
cr=f"{limit_curve_index:03}",
cj=tadm_algorithm,
cx=recording_mode,
)
if self._core96_supports_extended_liquid_command_params():
command_kwargs["cw"] = channel_pattern_hex
command_kwargs["cr"] = f"{limit_curve_index:03}"
command_kwargs["cj"] = tadm_algorithm
command_kwargs["cx"] = recording_mode
return await self.send_command(**command_kwargs)

# -------------- 3.10.4 Adjustment & movement commands --------------

Expand Down Expand Up @@ -10707,13 +10790,13 @@ async def clld_probe_y_position_using_channel(
adj_upper_y = await self.request_y_pos_channel_n(channel_idx - 1)
max_safe_upper_y_pos = adj_upper_y - self._min_spacing_between(channel_idx, channel_idx - 1)
else:
max_safe_upper_y_pos = self.extended_conf.pip_maximal_y_position
_, max_safe_upper_y_pos = self._pip_y_bounds()

if channel_idx < (self.num_channels - 1):
adj_lower_y = await self.request_y_pos_channel_n(channel_idx + 1)
max_safe_lower_y_pos = adj_lower_y + self._min_spacing_between(channel_idx, channel_idx + 1)
else:
max_safe_lower_y_pos = self.extended_conf.left_arm_min_y_position
max_safe_lower_y_pos, _ = self._pip_y_bounds()

# Enable safe start and end positions
if start_pos_search:
Expand Down Expand Up @@ -10794,7 +10877,7 @@ async def clld_probe_y_position_using_channel(
channel_idx, channel_idx + 1
)
else:
min_y = self.extended_conf.left_arm_min_y_position
min_y, _ = self._pip_y_bounds()

max_safe_dist = detected_material_y_pos - min_y
move_target = detected_material_y_pos - min(post_detection_dist, max_safe_dist)
Expand All @@ -10805,7 +10888,7 @@ async def clld_probe_y_position_using_channel(
channel_idx, channel_idx - 1
)
else:
max_y = self.extended_conf.pip_maximal_y_position
_, max_y = self._pip_y_bounds()

max_safe_dist = max_y - detected_material_y_pos
move_target = detected_material_y_pos + min(post_detection_dist, max_safe_dist)
Expand Down Expand Up @@ -11682,7 +11765,7 @@ async def get_channels_y_positions(self) -> Dict[int, float]:
# position_channels_in_y_direction, it will raise an error.) The minimum y is 6mm,
# so we fix that first (in case that value is misreported). Then, we traverse the
# list in reverse and enforce pairwise minimum spacing.
min_y = self.extended_conf.left_arm_min_y_position
min_y, _ = self._pip_y_bounds()
if y_positions[-1] < min_y - 0.2:
raise RuntimeError(
"Channels are reported to be too close to the front of the machine. "
Expand Down
Loading
Loading