Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion pylabrobot/hamilton/liquid_handlers/nimbus/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
from .channels import ChannelType, NimbusChannelConfig, NimbusChannelMap, Rail
from .chatterbox import NimbusChatterboxDriver
from .core import NimbusCoreGripper, NimbusCoreGripperFactory, NimbusGripperArm
from .door import NimbusDoor
from .driver import NimbusDriver
from .driver import NimbusDriver, NimbusSetupParams
from .info import NimbusInstrumentInfo
from .nimbus import Nimbus
from .pip_backend import NimbusPIPBackend

__all__ = [
"ChannelType",
"NimbusChannelConfig",
"NimbusChannelMap",
"NimbusChatterboxDriver",
"NimbusCoreGripper",
"NimbusCoreGripperFactory",
"NimbusDoor",
"NimbusDriver",
"NimbusGripperArm",
"NimbusInstrumentInfo",
"NimbusPIPBackend",
"NimbusSetupParams",
"Nimbus",
"Rail",
]
81 changes: 81 additions & 0 deletions pylabrobot/hamilton/liquid_handlers/nimbus/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Nimbus channel topology — typed wrapper around ChannelConfiguration cmd 30 data."""

from __future__ import annotations

import enum
from dataclasses import dataclass
from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
from .info import NimbusInstrumentInfo


class ChannelType(enum.IntEnum):
"""Mirrors GlobalObjects.ChannelType from NimbusCORE.dll."""

NONE = 0
CHANNEL_300UL = 1
CHANNEL_1000UL = 2
CHANNEL_5000UL = 3


class Rail(enum.IntEnum):
"""Mirrors GlobalObjects.Rail from NimbusCORE.dll."""

LEFT = 0
RIGHT = 1


_CHANNEL_TYPE_MAX_VOLUME: dict[ChannelType, float] = {
ChannelType.NONE: 0.0,
ChannelType.CHANNEL_300UL: 300.0,
ChannelType.CHANNEL_1000UL: 1000.0,
ChannelType.CHANNEL_5000UL: 5000.0,
}


@dataclass(frozen=True)
class NimbusChannelConfig:
"""Per-channel hardware configuration decoded from firmware."""

type: ChannelType
rail: Rail
previous_neighbor_spacing: int
next_neighbor_spacing: int
can_address: int

@property
def max_volume(self) -> float:
"""Maximum aspirate/dispense volume for this channel type in µL."""
return _CHANNEL_TYPE_MAX_VOLUME.get(self.type, 0.0)


@dataclass(frozen=True)
class NimbusChannelMap:
"""Typed per-channel topology built from :class:`NimbusInstrumentInfo`."""

channels: List[NimbusChannelConfig]

@property
def num_channels(self) -> int:
return len(self.channels)

def channel_type(self, index: int) -> ChannelType:
return self.channels[index].type

def max_volume_for_channel(self, index: int) -> float:
return self.channels[index].max_volume

@staticmethod
def from_info(info: "NimbusInstrumentInfo") -> "NimbusChannelMap":
channels = [
NimbusChannelConfig(
type=ChannelType(wire.channel_type),
rail=Rail(wire.rail),
previous_neighbor_spacing=wire.previous_neighbor_spacing,
next_neighbor_spacing=wire.next_neighbor_spacing,
can_address=wire.can_address,
)
for wire in info.channel_configurations
]
return NimbusChannelMap(channels=channels)
130 changes: 97 additions & 33 deletions pylabrobot/hamilton/liquid_handlers/nimbus/chatterbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@
from __future__ import annotations

import logging
from typing import Optional
from typing import Any, Optional

from pylabrobot.hamilton.tcp.commands import HamiltonCommand
from pylabrobot.capabilities.capability import BackendParams
from pylabrobot.hamilton.tcp.commands import TCPCommand
from pylabrobot.hamilton.tcp.introspection import ObjectInfo
from pylabrobot.hamilton.tcp.packets import Address
from pylabrobot.resources.hamilton.nimbus_decks import NimbusDeck

from .door import NimbusDoor
from .driver import NimbusDriver
from .commands import NimbusCommand, _UNRESOLVED
from .driver import NimbusDriver, NimbusSetupParams

logger = logging.getLogger(__name__)

_CHATTERBOX_NIMBUS_CORE = Address(1, 1, 48896)
_CHATTERBOX_PIPETTE = Address(1, 1, 257)
_CHATTERBOX_DOOR_LOCK = Address(1, 1, 268)

_CHATTERBOX_PATH_TO_ADDR = {
"NimbusCORE": _CHATTERBOX_NIMBUS_CORE,
"NimbusCORE.Pipette": _CHATTERBOX_PIPETTE,
"NimbusCORE.DoorLock": _CHATTERBOX_DOOR_LOCK,
}


class NimbusChatterboxDriver(NimbusDriver):
"""Chatterbox driver for Nimbus. Simulates commands for testing without hardware.
Expand All @@ -22,49 +33,102 @@ class NimbusChatterboxDriver(NimbusDriver):
and use canned addresses and responses instead.
"""

def __init__(self, deck: NimbusDeck, num_channels: int = 8):
# Pass dummy host — Socket is created but never opened
super().__init__(deck=deck, host="chatterbox", port=2000)
def __init__(self, num_channels: int = 8):
super().__init__(host="chatterbox", port=2000)
self._num_channels = num_channels

async def setup(self):
from .pip_backend import NimbusPIPBackend

# Use canned addresses (skip TCP connection entirely)
pipette_address = Address(1, 1, 257)
self._nimbus_core_address = Address(1, 1, 48896)
door_address = Address(1, 1, 268)

self.pip = NimbusPIPBackend(
driver=self, deck=self.deck, address=pipette_address, num_channels=self._num_channels
)
self.door = NimbusDoor(driver=self, address=door_address)
async def setup(self, backend_params: Optional[BackendParams] = None):
if backend_params is None:
params = NimbusSetupParams()
elif isinstance(backend_params, NimbusSetupParams):
params = backend_params
else:
raise TypeError(
"NimbusChatterboxDriver.setup expected NimbusSetupParams | None for backend_params, "
f"got {type(backend_params).__name__}"
)
del params

# Seed introspection registry with canned addresses (skip TCP connection entirely)
self._nimbus_core_address = _CHATTERBOX_NIMBUS_CORE
seed_paths = sorted(NimbusCommand._ALL_PATHS | set(_CHATTERBOX_PATH_TO_ADDR))
for idx, path in enumerate(seed_paths):
leaf = path.rsplit(".", 1)[-1]
addr = _CHATTERBOX_PATH_TO_ADDR.get(path, Address(1, 1, 1024 + idx))
self.registry.register(
path,
ObjectInfo(name=leaf, version="", method_count=0, subobject_count=0, address=addr),
)

async def stop(self):
if self.door is not None:
await self.door._on_stop()
self.door = None

async def send_command(self, command: HamiltonCommand, timeout: float = 10.0) -> Optional[dict]:
self._nimbus_core_address = None
self._invalidate_introspection_session()

async def send_command(
self,
command: TCPCommand,
ensure_connection: bool = True,
return_raw: bool = False,
raise_on_error: bool = True,
read_timeout: Optional[float] = None,
) -> Any:
del ensure_connection, raise_on_error, read_timeout
logger.info(f"[Chatterbox] {command.__class__.__name__}")

# Return canned responses for commands that need them
if isinstance(command, NimbusCommand) and command.dest == _UNRESOLVED:
path = type(command).firmware_path
if path is None:
raise RuntimeError(
f"{type(command).__name__} has no firmware_path declared and no "
"explicit dest= supplied at construction. Polymorphic-dest commands "
"must pass dest= to send_query or send_command."
)
try:
addr = await self.resolve_path(path)
except KeyError as exc:
raise RuntimeError(
f"Cannot send {type(command).__name__}: firmware path "
f"{path!r} did not resolve on this instrument ({exc})."
) from exc
command.dest = addr
command.dest_address = addr

from .commands import (
ChannelConfiguration,
GetChannelConfiguration,
GetChannelConfiguration_1,
IsCoreGripperPlateGripped,
IsCoreGripperToolHeld,
IsDoorLocked,
IsInitialized,
IsTipPresent,
NimbusChannelConfigWire,
)

if isinstance(command, GetChannelConfiguration_1):
return {"channels": self._num_channels}
if isinstance(command, ChannelConfiguration):
# 8× Channel300uL, alternating Left/Right rails
configs = [
NimbusChannelConfigWire(
channel_type=1, # Channel300uL
rail=i % 2, # 0=Left, 1=Right alternating
previous_neighbor_spacing=0,
next_neighbor_spacing=0,
can_address=i,
)
for i in range(self._num_channels)
]
return ChannelConfiguration.Response(configurations=configs)
if isinstance(command, IsInitialized):
return {"initialized": True}
return IsInitialized.Response(initialized=True)
if isinstance(command, IsTipPresent):
return {"tip_present": [False] * self._num_channels}
return IsTipPresent.Response(tip_present=[False] * self._num_channels)
if isinstance(command, IsDoorLocked):
return {"locked": True}
return IsDoorLocked.Response(locked=True)
if isinstance(command, GetChannelConfiguration):
return {"enabled": [False]}
return GetChannelConfiguration.Response(enabled=[False])
if isinstance(command, IsCoreGripperToolHeld):
return IsCoreGripperToolHeld.Response(gripped=False, tip_type=[])
if isinstance(command, IsCoreGripperPlateGripped):
return IsCoreGripperPlateGripped.Response(gripped=False)
if return_raw:
return (b"",)
return None
Loading
Loading