Skip to content
1 change: 1 addition & 0 deletions docs/resources/library/vwr.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Company page: [Wikipedia](https://en.wikipedia.org/wiki/VWR_International)
| Description | Image | PLR definition |
|--------------------|--------------------|--------------------|
| 'VWRReagentReservoirs25mL'<br>Part no.: 89094<br>[manufacturer website](https://us.vwr.com/store/product/4694822/vwr-disposable-pipetting-reservoirs)<br>Polystyrene Reservoirs | ![](img/vwr/VWRReagentReservoirs25mL.jpg) | `VWRReagentReservoirs25mL` |
| 'VWR_1_troughplate_195000uL_Ub'<br>Part no.: 77575-302<br>[manufacturer website](https://www.avantorsciences.com/us/en/product/47763965/vwr-multi-channel-polypropylene-reagent-reservoirs?isCatNumSearch=true&searchedCatalogNumber=77575-302)<br>Polypropylene multi-channel reagent reservoirs | ![](img/vwr/VWR_1_troughplate_195000uL_Ub.jpg) | `VWR_1_troughplate_195000uL_Ub` |

## Plates

Expand Down
274 changes: 186 additions & 88 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py

Large diffs are not rendered by default.

272 changes: 272 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# mypy: disable-error-code="attr-defined,method-assign"

import datetime
import unittest
import unittest.mock
from dataclasses import replace
from typing import Literal, cast

from pylabrobot.liquid_handling import LiquidHandler
Expand Down Expand Up @@ -34,6 +36,7 @@
CommandSyntaxError,
HamiltonNoTipError,
HardwareError,
Head96Information,
STARBackend,
STARFirmwareError,
UnknownHamiltonError,
Expand Down Expand Up @@ -168,6 +171,36 @@ async def test_send_command_plaintext_response(self):
await self.star.send_command("C0", command="QM", fmt="id####")


class TestSTARConfigurationNormalization(unittest.TestCase):
def test_inverted_y_bounds_are_swapped(self):
star = STARBackend()
star._pip_firmware_version = datetime.date(2024, 1, 1)
star._extended_conf = replace(
_DEFAULT_EXTENDED_CONFIGURATION,
pip_maximal_y_position=6.0,
left_arm_min_y_position=22.0,
)

star._normalize_extended_configuration_y_bounds()

self.assertEqual(star.extended_conf.pip_maximal_y_position, 606.5)
self.assertEqual(star.extended_conf.left_arm_min_y_position, 6.0)

def test_valid_y_bounds_are_kept(self):
star = STARBackend()
star._pip_firmware_version = datetime.date(2009, 1, 1)
star._extended_conf = replace(
_DEFAULT_EXTENDED_CONFIGURATION,
pip_maximal_y_position=606.5,
left_arm_min_y_position=6.0,
)

star._normalize_extended_configuration_y_bounds()

self.assertEqual(star.extended_conf.pip_maximal_y_position, 606.5)
self.assertEqual(star.extended_conf.left_arm_min_y_position, 6.0)


class STARCommandCatcher(STARBackend):
"""Mock backend for star that catches commands and saves them instead of sending them to the
machine."""
Expand Down Expand Up @@ -270,6 +303,18 @@ def __init__(self, name: str):

set_tip_tracking(enabled=False)

def _set_core96_firmware_year(self, year: int) -> None:
self.STAR._head96_information = Head96Information(
fw_version=datetime.date(year, 1, 1),
supports_clot_monitoring_clld=False,
stop_disc_type="core_i",
instrument_type="legacy",
head_type="96 head II",
)

def _set_pip_firmware_year(self, year: int) -> None:
self.STAR._pip_firmware_version = datetime.date(year, 1, 1)

async def test_core_read_barcode_success(self):
"""core_read_barcode_of_picked_up_resource should send ZB and return a Barcode."""

Expand Down Expand Up @@ -688,6 +733,24 @@ async def test_core_96_tip_pickup(self):
]
)

async def test_core_96_tip_pickup_skips_dispensing_drive_move_on_old_firmware(self):
self._set_core96_firmware_year(2009)

await self.lh.pick_up_tips96(self.tip_rack)

self.STAR._write_and_read_command.assert_has_calls(
[
_any_write_and_read_command_call("C0TTid0001tt01tf1tl0519tv03600tg2tu0"),
_any_write_and_read_command_call("C0EPid0002xs01179xd0yh2418tt01wu0za2164zh2450ze2450"),
]
)
self.assertFalse(
any(
"H0DQ" in call.kwargs["cmd"]
for call in self.STAR._write_and_read_command.call_args_list # type: ignore
)
)

async def test_tip_tracking_pick_up96(self):
set_tip_tracking(enabled=True)
await self.lh.pick_up_tips96(self.tip_rack)
Expand Down Expand Up @@ -731,6 +794,24 @@ async def test_core_96_aspirate(self):
]
)

async def test_core_96_aspirate_old_firmware_omits_extended_params(self):
self._set_core96_firmware_year(2009)

await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips
self.STAR._write_and_read_command.reset_mock()

assert self.plate.lid is not None
self.plate.lid.unassign()
await self.lh.aspirate96(self.plate, volume=100, blow_out=True)

self.STAR._write_and_read_command.assert_has_calls(
[
_any_write_and_read_command_call(
"C0EAid0003aa0xs02983xd0yh1457zh2450ze2450lz1999zt1866zm1866zv0032zq06180iw000ix0fh000af01083ag2500vt050bv00000wv00050cm0cs1bs0020wh10hv00000hc00hp000mj000hs1200"
),
]
)

async def test_core_96_dispense(self):
await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips
if self.plate.lid is not None:
Expand All @@ -750,6 +831,26 @@ async def test_core_96_dispense(self):
]
)

async def test_core_96_dispense_old_firmware_omits_extended_params(self):
self._set_core96_firmware_year(2009)

await self.lh.pick_up_tips96(self.tip_rack2) # pick up high volume tips
if self.plate.lid is not None:
self.plate.lid.unassign()
await self.lh.aspirate96(self.plate, 100, blow_out=True)
self.STAR._write_and_read_command.reset_mock()

with no_volume_tracking():
await self.lh.dispense96(self.plate, 100, blow_out=True)

self.STAR._write_and_read_command.assert_has_calls(
[
_any_write_and_read_command_call(
"C0EDid0004da3xs02983xd0yh1457zm1866zv0032zq06180lz1999zt1866iw000ix0fh000zh2450ze2450df01083dg1200es0050ev000vt050bv00000cm0cs1ej00bs0020wh00hv00000hc00hp000mj000hs1200"
),
]
)

async def test_core_96_dispense_quadrant(self):
"""Test that each quadrant of a 384-well plate produces the correct firmware command.

Expand Down Expand Up @@ -1015,6 +1116,160 @@ async def test_discard_tips(self):
]
)

async def test_pick_up_tips_omits_pickup_method_for_old_pip_firmware(self):
self._set_pip_firmware_year(2009)

await self.lh.pick_up_tips(self.tip_rack["A1:H1"])

sent = self.STAR._write_and_read_command.call_args_list[-1].kwargs["cmd"]
self.assertTrue(sent.startswith("C0TP"))
self.assertNotIn("td", sent)

async def test_discard_tips_omits_discarding_method_for_old_pip_firmware(self):
self._set_pip_firmware_year(2009)
await self.lh.pick_up_tips(self.tip_rack["A1:H1"])
self.STAR._write_and_read_command.side_effect = [
"C0TRid0003kz000 000 000 000 000 000 000 000vz000 000 000 000 000 000 000 000"
]
self.STAR._write_and_read_command.reset_mock()

await self.lh.discard_tips()

sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
self.assertTrue(sent.startswith("C0TR"))
self.assertNotIn("ti", sent)

async def test_initialize_pipetting_channels_omits_discarding_method_for_old_pip_firmware(self):
self._set_pip_firmware_year(2009)

await self.STAR.initialize_pipetting_channels(
x_positions=[8000],
y_positions=[3427, 3337, 3247, 3157, 3067, 2977, 2887, 2797],
begin_of_tip_deposit_process=2450,
end_of_tip_deposit_process=1870,
z_position_at_end_of_a_command=2450,
tip_pattern=[True] * 8,
tip_type=1,
discarding_method=0,
)

sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
self.assertTrue(sent.startswith("C0DI"))
self.assertNotIn("ti", sent)

async def test_aspirate_pip_omits_tadm_fields_for_old_pip_firmware(self):
self._set_pip_firmware_year(2009)

await self.STAR.aspirate_pip(
aspiration_type=[0],
tip_pattern=[True],
x_positions=[8000],
y_positions=[3427],
aspiration_volumes=[100],
limit_curve_index=[0],
tadm_algorithm=False,
recording_mode=0,
)

sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
self.assertTrue(sent.startswith("C0AS"))
self.assertNotIn("po", sent)
self.assertNotIn("gi", sent)
self.assertNotIn("gj", sent)
self.assertNotIn("gk", sent)
self.assertNotIn("lk", sent)
self.assertNotIn("ik", sent)
self.assertNotIn("sd", sent)
self.assertNotIn("se", sent)
self.assertNotIn("sz", sent)
self.assertNotIn("io", sent)

async def test_dispense_pip_omits_tadm_fields_for_old_pip_firmware(self):
self._set_pip_firmware_year(2009)

await self.STAR.dispense_pip(
tip_pattern=[True],
dispensing_mode=[0],
x_positions=[8000],
y_positions=[3427],
dispense_volumes=[100],
limit_curve_index=[0],
tadm_algorithm=False,
recording_mode=0,
)

sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
self.assertTrue(sent.startswith("C0DS"))
self.assertNotIn("po", sent)
self.assertNotIn("gi", sent)
self.assertNotIn("gj", sent)
self.assertNotIn("gk", sent)

async def test_aspirate_pip_includes_po_for_modern_pip_firmware(self):
self._set_pip_firmware_year(2024)

await self.STAR.aspirate_pip(
aspiration_type=[0],
tip_pattern=[True],
x_positions=[8000],
y_positions=[3427],
aspiration_volumes=[100],
pull_out_distance_transport_air=[100],
)

sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
self.assertTrue(sent.startswith("C0AS"))
self.assertIn("po0100", sent)

async def test_dispense_pip_includes_po_for_modern_pip_firmware(self):
self._set_pip_firmware_year(2024)

await self.STAR.dispense_pip(
tip_pattern=[True],
dispensing_mode=[0],
x_positions=[8000],
y_positions=[3427],
dispense_volumes=[100],
pull_out_distance_transport_air=[100],
)

sent = self.STAR._write_and_read_command.call_args.kwargs["cmd"]
self.assertTrue(sent.startswith("C0DS"))
self.assertIn("po0100", sent)

async def test_ops_to_fw_positions_allows_legacy_sub_9mm_spacing(self):
self._set_pip_firmware_year(2009)

op1 = Pickup(
resource=self.tip_rack["A1"][0],
offset=Coordinate.zero(),
tip=self.tip_rack["A1"][0].get_tip(),
)
op2 = Pickup(
resource=self.tip_rack["A1"][0],
offset=Coordinate(0, -7.9, 0),
tip=self.tip_rack["A1"][0].get_tip(),
)

self.STAR._ops_to_fw_positions((op1, op2), use_channels=[0, 1])

async def test_ops_to_fw_positions_rejects_modern_sub_9mm_spacing(self):
self._set_pip_firmware_year(2024)

op1 = Pickup(
resource=self.tip_rack["A1"][0],
offset=Coordinate.zero(),
tip=self.tip_rack["A1"][0].get_tip(),
)
op2 = Pickup(
resource=self.tip_rack["A1"][0],
offset=Coordinate(0, -7.9, 0),
tip=self.tip_rack["A1"][0].get_tip(),
)

with self.assertRaisesRegex(ValueError, "Minimum distance between two y positions is <9mm"):
self.STAR._ops_to_fw_positions((op1, op2), use_channels=[0, 1])

async def test_portrait_tip_rack_handling(self):
deck = STARLetDeck()
lh = LiquidHandler(self.STAR, deck=deck)
Expand Down Expand Up @@ -1574,6 +1829,23 @@ async def test_can_reach_4ch_18mm_rejects_back_channel_too_far_back(self):
backend._channels_minimum_y_spacing = [18.0] * 4
self.assertFalse(backend.can_reach_position(3, Coordinate(100, 574, 100)))

async def test_can_reach_position_normalizes_inverted_y_bounds(self):
"""Inverted raw Y limits should not make ordinary 8-channel STARlet positions unreachable."""

backend = STARBackend()
backend._num_channels = 8
backend._channels_minimum_y_spacing = [8.98] * 8
backend._extended_conf = replace(
_DEFAULT_EXTENDED_CONFIGURATION,
pip_maximal_y_position=6.0,
left_arm_min_y_position=22.0,
)

backend._normalize_extended_configuration_y_bounds()

self.assertTrue(backend.can_reach_position(0, Coordinate(100, 337.8, 100)))
self.assertTrue(backend.can_reach_position(7, Coordinate(100, 274.8, 100)))

# -- position_channels_in_y_direction: validation rejects tight positions -------

def _make_star_backend(self, num_channels, spacings):
Expand Down
30 changes: 17 additions & 13 deletions pylabrobot/liquid_handling/backends/hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,21 +416,25 @@ def _ops_to_fw_positions(
y_pos = ops[i].resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + ops[i].offset.y
y_positions.append(round(y_pos * 10))

pip_has_old_firmware = getattr(self, "_pip_has_old_firmware", None)
enforce_min_y_spacing = not (callable(pip_has_old_firmware) and pip_has_old_firmware())

# check that the minimum d between any two y positions is >9mm
# O(n^2) search is not great but this is most readable, and the max size is 16, so it's fine.
for channel_idx1, (x1, y1) in enumerate(zip(x_positions, y_positions)):
for channel_idx2, (x2, y2) in enumerate(zip(x_positions, y_positions)):
if channel_idx1 == channel_idx2:
continue
if not channels_involved[channel_idx1] or not channels_involved[channel_idx2]:
continue
if x1 != x2: # channels not on the same column -> will be two operations on the machine
continue
if y1 != y2 and abs(y1 - y2) < 90:
raise ValueError(
f"Minimum distance between two y positions is <9mm: {y1}, {y2}"
f" (channel {channel_idx1} and {channel_idx2})"
)
if enforce_min_y_spacing:
for channel_idx1, (x1, y1) in enumerate(zip(x_positions, y_positions)):
for channel_idx2, (x2, y2) in enumerate(zip(x_positions, y_positions)):
if channel_idx1 == channel_idx2:
continue
if not channels_involved[channel_idx1] or not channels_involved[channel_idx2]:
continue
if x1 != x2: # channels not on the same column -> will be two operations on the machine
continue
if y1 != y2 and abs(y1 - y2) < 90:
raise ValueError(
f"Minimum distance between two y positions is <9mm: {y1}, {y2}"
f" (channel {channel_idx1} and {channel_idx2})"
)

if len(ops) > self.num_channels:
raise ValueError(f"Too many channels specified: {len(ops)} > {self.num_channels}")
Expand Down
Loading
Loading