Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
401 changes: 401 additions & 0 deletions docs/L3-L2_host-device_communication.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions python/bindings/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ nanobind_add_module(_task_interface ${BINDING_SOURCES} ${HIERARCHICAL_SOURCES})

target_sources(_task_interface PRIVATE
${CMAKE_SOURCE_DIR}/src/common/worker/chip_worker.cpp
${CMAKE_SOURCE_DIR}/src/common/worker/host_device_channel.cpp
${CMAKE_SOURCE_DIR}/src/common/worker/host_device_memory.cpp
)

target_include_directories(_task_interface PRIVATE
Expand Down
120 changes: 120 additions & 0 deletions python/bindings/task_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,126 @@ NB_MODULE(_task_interface, m) {
.def("free", &ChipWorker::free, nb::arg("ptr"))
.def("copy_to", &ChipWorker::copy_to, nb::arg("dst"), nb::arg("src"), nb::arg("size"))
.def("copy_from", &ChipWorker::copy_from, nb::arg("dst"), nb::arg("src"), nb::arg("size"))
.def(
"open_channel",
[](ChipWorker &self, uint32_t cpu_to_l2_lanes, uint32_t l2_to_cpu_lanes, uint32_t lane_depth,
uint32_t max_message_bytes, uint32_t flags) {
HostDeviceChannelConfig cfg{
cpu_to_l2_lanes, l2_to_cpu_lanes, lane_depth, max_message_bytes, flags
};
return self.open_channel(cfg);
},
nb::arg("cpu_to_l2_lanes") = 1, nb::arg("l2_to_cpu_lanes") = 1, nb::arg("lane_depth") = 64,
nb::arg("max_message_bytes") = HDCH_MAX_INLINE_BYTES, nb::arg("flags") = 0,
"Open a bounded host/device message channel."
)
.def("close_channel", &ChipWorker::close_channel, nb::arg("channel"))
.def(
"channel_send",
[](ChipWorker &self, uint64_t ch, uint32_t route, nb::bytes data, uint64_t correlation_id,
uint32_t timeout_us) {
std::string payload(data.c_str(), data.size());
self.channel_send(ch, route, payload.data(), payload.size(), correlation_id, timeout_us);
Comment on lines +762 to +763
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating an intermediate std::string from nb::bytes is redundant and incurs an unnecessary copy. nb::bytes provides direct access to the underlying buffer via c_str() and size(), which can be passed directly to self.channel_send.

Suggested change
std::string payload(data.c_str(), data.size());
self.channel_send(ch, route, payload.data(), payload.size(), correlation_id, timeout_us);
self.channel_send(ch, route, data.c_str(), data.size(), correlation_id, timeout_us);

},
nb::arg("channel"), nb::arg("route"), nb::arg("data"), nb::arg("correlation_id") = 0,
nb::arg("timeout_us") = 0
)
.def(
"channel_recv",
[](ChipWorker &self, uint64_t ch, size_t capacity, uint32_t timeout_us) {
uint32_t route = 0;
uint64_t correlation_id = 0;
auto data = self.channel_recv(ch, capacity, timeout_us, &route, &correlation_id);
return nb::make_tuple(nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()), route, correlation_id);
},
nb::arg("channel"), nb::arg("capacity") = HDCH_MAX_INLINE_BYTES, nb::arg("timeout_us") = 0
)
.def(
"channel_send_l2_for_test",
[](ChipWorker &self, uint64_t ch, uint32_t route, nb::bytes data, uint64_t correlation_id,
uint32_t timeout_us) {
std::string payload(data.c_str(), data.size());
self.channel_send_l2_for_test(ch, route, payload.data(), payload.size(), correlation_id, timeout_us);
Comment on lines +782 to +783
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating an intermediate std::string from nb::bytes is redundant and incurs an unnecessary copy. nb::bytes provides direct access to the underlying buffer via c_str() and size(), which can be passed directly to self.channel_send_l2_for_test.

Suggested change
std::string payload(data.c_str(), data.size());
self.channel_send_l2_for_test(ch, route, payload.data(), payload.size(), correlation_id, timeout_us);
self.channel_send_l2_for_test(ch, route, data.c_str(), data.size(), correlation_id, timeout_us);

},
nb::arg("channel"), nb::arg("route"), nb::arg("data"), nb::arg("correlation_id") = 0,
nb::arg("timeout_us") = 0
)
.def(
"channel_recv_l2_for_test",
[](ChipWorker &self, uint64_t ch, size_t capacity, uint32_t timeout_us) {
uint32_t route = 0;
uint64_t correlation_id = 0;
auto data = self.channel_recv_l2_for_test(ch, capacity, timeout_us, &route, &correlation_id);
return nb::make_tuple(nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()), route, correlation_id);
},
nb::arg("channel"), nb::arg("capacity") = HDCH_MAX_INLINE_BYTES, nb::arg("timeout_us") = 0
)
.def(
"open_shared_memory",
[](ChipWorker &self, uint64_t data_bytes, uint32_t signal_count, uint32_t flags) {
HostDeviceMemoryConfig cfg{data_bytes, signal_count, flags};
return self.open_shared_memory(cfg);
},
nb::arg("data_bytes"), nb::arg("signal_count") = 2, nb::arg("flags") = 0,
"Open a host/device shared-memory region."
)
.def("close_shared_memory", &ChipWorker::close_shared_memory, nb::arg("memory"))
.def(
"shared_memory_info",
[](ChipWorker &self, uint64_t mem) {
HostDeviceMemoryInfo info = self.shared_memory_info(mem);
return nb::make_tuple(info.host_ptr, info.device_ptr, info.data_bytes, info.signal_count, info.flags);
},
nb::arg("memory"), "Return shared-memory metadata. host_ptr is a current-process address."
)
.def(
"shared_memory_read",
[](ChipWorker &self, uint64_t mem, uint64_t offset, size_t nbytes) {
auto data = self.shared_memory_read(mem, offset, nbytes);
return nb::bytes(reinterpret_cast<const char *>(data.data()), data.size());
},
nb::arg("memory"), nb::arg("offset"), nb::arg("nbytes")
)
.def(
"shared_memory_write",
[](ChipWorker &self, uint64_t mem, uint64_t offset, nb::bytes data) {
std::string payload(data.c_str(), data.size());
self.shared_memory_write(mem, offset, payload.data(), payload.size());
},
nb::arg("memory"), nb::arg("offset"), nb::arg("data")
)
.def(
"shared_memory_notify", &ChipWorker::shared_memory_notify, nb::arg("memory"), nb::arg("signal_id"),
nb::arg("value")
)
.def(
"shared_memory_wait", &ChipWorker::shared_memory_wait, nb::arg("memory"), nb::arg("signal_id"),
nb::arg("target"), nb::arg("timeout_us") = 0
)
.def(
"shared_memory_read_l2_for_test",
[](ChipWorker &self, uint64_t mem, uint64_t offset, size_t nbytes) {
auto data = self.shared_memory_read_l2_for_test(mem, offset, nbytes);
return nb::bytes(reinterpret_cast<const char *>(data.data()), data.size());
},
nb::arg("memory"), nb::arg("offset"), nb::arg("nbytes")
)
.def(
"shared_memory_write_l2_for_test",
[](ChipWorker &self, uint64_t mem, uint64_t offset, nb::bytes data) {
std::string payload(data.c_str(), data.size());
self.shared_memory_write_l2_for_test(mem, offset, payload.data(), payload.size());
},
nb::arg("memory"), nb::arg("offset"), nb::arg("data")
)
.def(
"shared_memory_notify_l2_for_test", &ChipWorker::shared_memory_notify_l2_for_test,
nb::arg("memory"), nb::arg("signal_id"), nb::arg("value")
)
.def(
"shared_memory_wait_l2_for_test", &ChipWorker::shared_memory_wait_l2_for_test,
nb::arg("memory"), nb::arg("signal_id"), nb::arg("target"), nb::arg("timeout_us") = 0
)
.def(
"comm_init", &ChipWorker::comm_init, nb::arg("rank"), nb::arg("nranks"), nb::arg("rootinfo_path"),
"Initialize a communicator for this rank. ChipWorker owns ACL + stream "
Expand Down
131 changes: 131 additions & 0 deletions python/bindings/worker_bind.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

#include <cstdint>
#include <stdexcept>
#include <string>
#include <vector>

#include "chip_bootstrap_channel.h"
#include "ring.h"
Expand Down Expand Up @@ -159,6 +161,106 @@ inline void bind_worker(nb::module_ &m) {
},
nb::arg("worker_id"), nb::arg("dst"), nb::arg("src"), nb::arg("size"), "Copy worker src to host dst."
)
.def(
"open_channel",
[](Orchestrator &self, int worker_id, uint32_t cpu_to_l2_lanes, uint32_t l2_to_cpu_lanes,
uint32_t lane_depth, uint32_t max_message_bytes) {
return self.open_channel(worker_id, cpu_to_l2_lanes, l2_to_cpu_lanes, lane_depth, max_message_bytes);
},
nb::arg("worker_id"), nb::arg("cpu_to_l2_lanes") = 1, nb::arg("l2_to_cpu_lanes") = 1,
nb::arg("lane_depth") = 64, nb::arg("max_message_bytes") = 256,
"Open a host/device message channel on a next-level worker."
)
.def(
"close_channel",
[](Orchestrator &self, int worker_id, uint64_t channel) {
self.close_channel(worker_id, channel);
},
nb::arg("worker_id"), nb::arg("channel"), "Close a host/device message channel."
)
.def(
"channel_send",
[](Orchestrator &self, int worker_id, uint64_t channel, uint32_t route, nb::bytes data,
uint64_t correlation_id) {
std::string payload(data.c_str(), data.size());
std::vector<uint8_t> bytes(payload.begin(), payload.end());
self.channel_send(worker_id, channel, route, bytes, correlation_id);
Comment on lines +185 to +187
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This implementation performs multiple redundant copies (nb::bytes -> std::string -> std::vector<uint8_t>). By updating the Orchestrator::channel_send signature to take a raw pointer and size, you can pass the data directly from nb::bytes without any intermediate allocations.

                self.channel_send(worker_id, channel, route, data.c_str(), data.size(), correlation_id);

},
nb::arg("worker_id"), nb::arg("channel"), nb::arg("route"), nb::arg("data"), nb::arg("correlation_id") = 0,
"Send a message through a host/device channel."
)
.def(
"channel_recv",
[](Orchestrator &self, int worker_id, uint64_t channel, size_t capacity, uint32_t timeout_us) {
uint32_t route = 0;
uint64_t correlation_id = 0;
auto data = self.channel_recv(worker_id, channel, capacity, timeout_us, &route, &correlation_id);
return nb::make_tuple(
nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()), route, correlation_id
);
},
nb::arg("worker_id"), nb::arg("channel"), nb::arg("capacity") = 256, nb::arg("timeout_us") = 0,
"Receive a message through a host/device channel."
)
.def(
"open_shared_memory",
[](Orchestrator &self, int worker_id, uint64_t data_bytes, uint32_t signal_count, uint32_t flags) {
return self.open_shared_memory(worker_id, data_bytes, signal_count, flags);
},
nb::arg("worker_id"), nb::arg("data_bytes"), nb::arg("signal_count") = 2, nb::arg("flags") = 0,
"Open a host/device shared-memory region on a next-level worker."
)
.def(
"close_shared_memory",
[](Orchestrator &self, int worker_id, uint64_t memory) {
self.close_shared_memory(worker_id, memory);
},
nb::arg("worker_id"), nb::arg("memory"), "Close a host/device shared-memory region."
)
.def(
"shared_memory_info",
[](Orchestrator &self, int worker_id, uint64_t memory) {
HostDeviceMemoryInfo info = self.shared_memory_info(worker_id, memory);
return nb::make_tuple(info.host_ptr, info.device_ptr, info.data_bytes, info.signal_count, info.flags);
},
nb::arg("worker_id"), nb::arg("memory"),
"Return shared-memory metadata. host_ptr is 0 for hierarchical mailbox callers."
)
.def(
"shared_memory_read",
[](Orchestrator &self, int worker_id, uint64_t memory, uint64_t offset, size_t nbytes) {
auto data = self.shared_memory_read(worker_id, memory, offset, nbytes);
return nb::bytes(reinterpret_cast<const char *>(data.data()), data.size());
},
nb::arg("worker_id"), nb::arg("memory"), nb::arg("offset"), nb::arg("nbytes"),
"Read shared-memory bytes via chunked mailbox RPC. Returns a full materialized bytes object; this is "
"not streaming or zero-copy."
)
.def(
"shared_memory_write",
[](Orchestrator &self, int worker_id, uint64_t memory, uint64_t offset, nb::bytes data) {
std::string payload(data.c_str(), data.size());
std::vector<uint8_t> bytes(payload.begin(), payload.end());
self.shared_memory_write(worker_id, memory, offset, bytes);
},
nb::arg("worker_id"), nb::arg("memory"), nb::arg("offset"), nb::arg("data"),
"Write shared-memory bytes via chunked mailbox RPC. This is not streaming or zero-copy."
)
.def(
"shared_memory_notify",
[](Orchestrator &self, int worker_id, uint64_t memory, uint32_t signal_id, uint64_t value) {
self.shared_memory_notify(worker_id, memory, signal_id, value);
},
nb::arg("worker_id"), nb::arg("memory"), nb::arg("signal_id"), nb::arg("value")
)
.def(
"shared_memory_wait",
[](Orchestrator &self, int worker_id, uint64_t memory, uint32_t signal_id, uint64_t target,
uint32_t timeout_us) {
self.shared_memory_wait(worker_id, memory, signal_id, target, timeout_us);
},
nb::arg("worker_id"), nb::arg("memory"), nb::arg("signal_id"), nb::arg("target"), nb::arg("timeout_us") = 0
)
.def(
"alloc",
[](Orchestrator &self, const std::vector<uint32_t> &shape, DataType dtype) {
Expand Down Expand Up @@ -251,6 +353,35 @@ inline void bind_worker(nb::module_ &m) {
m.attr("MAILBOX_SIZE") = static_cast<int>(MAILBOX_SIZE);
m.attr("MAILBOX_OFF_ERROR_MSG") = static_cast<int>(MAILBOX_OFF_ERROR_MSG);
m.attr("MAILBOX_ERROR_MSG_SIZE") = static_cast<int>(MAILBOX_ERROR_MSG_SIZE);
m.attr("MAILBOX_OFF_ARGS") = static_cast<int>(MAILBOX_OFF_ARGS);
m.attr("MAILBOX_ARGS_CAPACITY") = static_cast<int>(MAILBOX_ARGS_CAPACITY);
m.attr("CTRL_MALLOC") = static_cast<uint64_t>(CTRL_MALLOC);
m.attr("CTRL_FREE") = static_cast<uint64_t>(CTRL_FREE);
m.attr("CTRL_COPY_TO") = static_cast<uint64_t>(CTRL_COPY_TO);
m.attr("CTRL_COPY_FROM") = static_cast<uint64_t>(CTRL_COPY_FROM);
m.attr("CTRL_PREPARE") = static_cast<uint64_t>(CTRL_PREPARE);
m.attr("CTRL_REGISTER") = static_cast<uint64_t>(CTRL_REGISTER);
m.attr("CTRL_UNREGISTER") = static_cast<uint64_t>(CTRL_UNREGISTER);
m.attr("CTRL_OPEN_CHANNEL") = static_cast<uint64_t>(CTRL_OPEN_CHANNEL);
m.attr("CTRL_CLOSE_CHANNEL") = static_cast<uint64_t>(CTRL_CLOSE_CHANNEL);
m.attr("CTRL_CHANNEL_SEND") = static_cast<uint64_t>(CTRL_CHANNEL_SEND);
m.attr("CTRL_CHANNEL_RECV") = static_cast<uint64_t>(CTRL_CHANNEL_RECV);
m.attr("CTRL_OPEN_SHARED_MEMORY") = static_cast<uint64_t>(CTRL_OPEN_SHARED_MEMORY);
m.attr("CTRL_CLOSE_SHARED_MEMORY") = static_cast<uint64_t>(CTRL_CLOSE_SHARED_MEMORY);
m.attr("CTRL_SHARED_MEMORY_INFO") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_INFO);
m.attr("CTRL_SHARED_MEMORY_READ") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_READ);
m.attr("CTRL_SHARED_MEMORY_WRITE") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_WRITE);
m.attr("CTRL_SHARED_MEMORY_NOTIFY") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_NOTIFY);
m.attr("CTRL_SHARED_MEMORY_WAIT") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_WAIT);
m.attr("CTRL_OFF_ARG0") = static_cast<int>(CTRL_OFF_ARG0);
m.attr("CTRL_OFF_ARG1") = static_cast<int>(CTRL_OFF_ARG1);
m.attr("CTRL_OFF_ARG2") = static_cast<int>(CTRL_OFF_ARG2);
m.attr("CTRL_OFF_RESULT") = static_cast<int>(CTRL_OFF_RESULT);
m.attr("CTRL_OFF_ARG3") = static_cast<int>(CTRL_OFF_ARG3);
m.attr("CTRL_OFF_ARG4") = static_cast<int>(CTRL_OFF_ARG4);
m.attr("CTRL_OFF_PAYLOAD") = static_cast<int>(CTRL_OFF_PAYLOAD);
m.attr("CTRL_PAYLOAD_CAPACITY") = static_cast<int>(CTRL_PAYLOAD_CAPACITY);
m.attr("CTRL_SHM_NAME_BYTES") = static_cast<int>(CTRL_SHM_NAME_BYTES);
m.attr("MAX_RING_DEPTH") = static_cast<int32_t>(MAX_RING_DEPTH);
m.attr("MAX_SCOPE_DEPTH") = static_cast<int32_t>(MAX_SCOPE_DEPTH);

Expand Down
85 changes: 85 additions & 0 deletions python/simpler/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,91 @@ def copy_from(self, worker_id: int, dst: int, src: int, size: int) -> None:
"""Copy *size* bytes from worker *src* to host *dst*."""
self._o.copy_from(int(worker_id), int(dst), int(src), int(size))

def open_channel(
self,
worker_id: int,
cpu_to_l2_lanes: int = 1,
l2_to_cpu_lanes: int = 1,
lane_depth: int = 64,
max_message_bytes: int = 256,
) -> int:
"""Open a bounded L3/L2 message channel on next-level worker *worker_id*."""
return int(
self._o.open_channel(
int(worker_id),
int(cpu_to_l2_lanes),
int(l2_to_cpu_lanes),
int(lane_depth),
int(max_message_bytes),
)
)

def close_channel(self, worker_id: int, channel: int) -> None:
"""Close a channel returned by ``open_channel``."""
self._o.close_channel(int(worker_id), int(channel))

def channel_send(
self,
worker_id: int,
channel: int,
route: int,
data: bytes,
correlation_id: int = 0,
) -> None:
"""Send one inline message from L3 CPU toward L2."""
self._o.channel_send(int(worker_id), int(channel), int(route), bytes(data), int(correlation_id))

def channel_recv(self, worker_id: int, channel: int, capacity: int = 256, timeout_us: int = 0) -> tuple[bytes, int, int]:
"""Receive one inline message from L2 toward L3 CPU."""
data, route, correlation_id = self._o.channel_recv(
int(worker_id), int(channel), int(capacity), int(timeout_us)
)
return bytes(data), int(route), int(correlation_id)

def open_shared_memory(self, worker_id: int, data_bytes: int, signal_count: int = 2, flags: int = 0) -> int:
"""Open a host/device shared-memory region on next-level worker *worker_id*."""
return int(self._o.open_shared_memory(int(worker_id), int(data_bytes), int(signal_count), int(flags)))

def close_shared_memory(self, worker_id: int, memory: int) -> None:
"""Close a shared-memory region returned by ``open_shared_memory``."""
self._o.close_shared_memory(int(worker_id), int(memory))

def shared_memory_info(self, worker_id: int, memory: int) -> tuple[int, int, int, int, int]:
"""Return ``(host_ptr, device_ptr, data_bytes, signal_count, flags)``.

``host_ptr`` is always ``0`` because the L3 parent has no directly
dereferenceable host mapping for chip-child shared memory.
"""
host_ptr, device_ptr, data_bytes, signal_count, flags = self._o.shared_memory_info(int(worker_id), int(memory))
return int(host_ptr), int(device_ptr), int(data_bytes), int(signal_count), int(flags)

def shared_memory_read(self, worker_id: int, memory: int, offset: int, nbytes: int) -> bytes:
"""Read bytes from a shared-memory data region.

L3 access chunks large reads through mailbox RPC; it is not a direct
parent-process mapping or streaming data plane. The returned
``bytes`` materializes the full requested range.
"""
return bytes(self._o.shared_memory_read(int(worker_id), int(memory), int(offset), int(nbytes)))

def shared_memory_write(self, worker_id: int, memory: int, offset: int, data: bytes) -> None:
"""Write bytes into a shared-memory data region.

L3 access chunks large writes through mailbox RPC; it is not a direct
parent-process mapping or streaming data plane.
"""
self._o.shared_memory_write(int(worker_id), int(memory), int(offset), bytes(data))

def shared_memory_notify(self, worker_id: int, memory: int, signal_id: int, value: int) -> None:
"""Publish a software signal value for a shared-memory region."""
self._o.shared_memory_notify(int(worker_id), int(memory), int(signal_id), int(value))

def shared_memory_wait(
self, worker_id: int, memory: int, signal_id: int, target: int, timeout_us: int = 0
) -> None:
"""Wait until a shared-memory software signal reaches ``target``."""
self._o.shared_memory_wait(int(worker_id), int(memory), int(signal_id), int(target), int(timeout_us))

def alloc(self, shape: Sequence[int], dtype: DataType) -> ContinuousTensor:
"""Allocate a runtime-managed intermediate buffer.

Expand Down
Loading