-
Notifications
You must be signed in to change notification settings - Fork 51
feat(runtime): add L3/L2 host-device communication #803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d3be82c
589033b
1756c1a
913170c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -741,6 +741,126 @@ NB_MODULE(_task_interface, m) { | |||||||
| .def("free", &ChipWorker::free, nb::arg("ptr")) | ||||||||
| .def("copy_to", &ChipWorker::copy_to, nb::arg("dst"), nb::arg("src"), nb::arg("size")) | ||||||||
| .def("copy_from", &ChipWorker::copy_from, nb::arg("dst"), nb::arg("src"), nb::arg("size")) | ||||||||
| .def( | ||||||||
| "open_channel", | ||||||||
| [](ChipWorker &self, uint32_t cpu_to_l2_lanes, uint32_t l2_to_cpu_lanes, uint32_t lane_depth, | ||||||||
| uint32_t max_message_bytes, uint32_t flags) { | ||||||||
| HostDeviceChannelConfig cfg{ | ||||||||
| cpu_to_l2_lanes, l2_to_cpu_lanes, lane_depth, max_message_bytes, flags | ||||||||
| }; | ||||||||
| return self.open_channel(cfg); | ||||||||
| }, | ||||||||
| nb::arg("cpu_to_l2_lanes") = 1, nb::arg("l2_to_cpu_lanes") = 1, nb::arg("lane_depth") = 64, | ||||||||
| nb::arg("max_message_bytes") = HDCH_MAX_INLINE_BYTES, nb::arg("flags") = 0, | ||||||||
| "Open a bounded host/device message channel." | ||||||||
| ) | ||||||||
| .def("close_channel", &ChipWorker::close_channel, nb::arg("channel")) | ||||||||
| .def( | ||||||||
| "channel_send", | ||||||||
| [](ChipWorker &self, uint64_t ch, uint32_t route, nb::bytes data, uint64_t correlation_id, | ||||||||
| uint32_t timeout_us) { | ||||||||
| std::string payload(data.c_str(), data.size()); | ||||||||
| self.channel_send(ch, route, payload.data(), payload.size(), correlation_id, timeout_us); | ||||||||
| }, | ||||||||
| nb::arg("channel"), nb::arg("route"), nb::arg("data"), nb::arg("correlation_id") = 0, | ||||||||
| nb::arg("timeout_us") = 0 | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "channel_recv", | ||||||||
| [](ChipWorker &self, uint64_t ch, size_t capacity, uint32_t timeout_us) { | ||||||||
| uint32_t route = 0; | ||||||||
| uint64_t correlation_id = 0; | ||||||||
| auto data = self.channel_recv(ch, capacity, timeout_us, &route, &correlation_id); | ||||||||
| return nb::make_tuple(nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()), route, correlation_id); | ||||||||
| }, | ||||||||
| nb::arg("channel"), nb::arg("capacity") = HDCH_MAX_INLINE_BYTES, nb::arg("timeout_us") = 0 | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "channel_send_l2_for_test", | ||||||||
| [](ChipWorker &self, uint64_t ch, uint32_t route, nb::bytes data, uint64_t correlation_id, | ||||||||
| uint32_t timeout_us) { | ||||||||
| std::string payload(data.c_str(), data.size()); | ||||||||
| self.channel_send_l2_for_test(ch, route, payload.data(), payload.size(), correlation_id, timeout_us); | ||||||||
|
Comment on lines
+782
to
+783
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Creating an intermediate
Suggested change
|
||||||||
| }, | ||||||||
| nb::arg("channel"), nb::arg("route"), nb::arg("data"), nb::arg("correlation_id") = 0, | ||||||||
| nb::arg("timeout_us") = 0 | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "channel_recv_l2_for_test", | ||||||||
| [](ChipWorker &self, uint64_t ch, size_t capacity, uint32_t timeout_us) { | ||||||||
| uint32_t route = 0; | ||||||||
| uint64_t correlation_id = 0; | ||||||||
| auto data = self.channel_recv_l2_for_test(ch, capacity, timeout_us, &route, &correlation_id); | ||||||||
| return nb::make_tuple(nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()), route, correlation_id); | ||||||||
| }, | ||||||||
| nb::arg("channel"), nb::arg("capacity") = HDCH_MAX_INLINE_BYTES, nb::arg("timeout_us") = 0 | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "open_shared_memory", | ||||||||
| [](ChipWorker &self, uint64_t data_bytes, uint32_t signal_count, uint32_t flags) { | ||||||||
| HostDeviceMemoryConfig cfg{data_bytes, signal_count, flags}; | ||||||||
| return self.open_shared_memory(cfg); | ||||||||
| }, | ||||||||
| nb::arg("data_bytes"), nb::arg("signal_count") = 2, nb::arg("flags") = 0, | ||||||||
| "Open a host/device shared-memory region." | ||||||||
| ) | ||||||||
| .def("close_shared_memory", &ChipWorker::close_shared_memory, nb::arg("memory")) | ||||||||
| .def( | ||||||||
| "shared_memory_info", | ||||||||
| [](ChipWorker &self, uint64_t mem) { | ||||||||
| HostDeviceMemoryInfo info = self.shared_memory_info(mem); | ||||||||
| return nb::make_tuple(info.host_ptr, info.device_ptr, info.data_bytes, info.signal_count, info.flags); | ||||||||
| }, | ||||||||
| nb::arg("memory"), "Return shared-memory metadata. host_ptr is a current-process address." | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_read", | ||||||||
| [](ChipWorker &self, uint64_t mem, uint64_t offset, size_t nbytes) { | ||||||||
| auto data = self.shared_memory_read(mem, offset, nbytes); | ||||||||
| return nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()); | ||||||||
| }, | ||||||||
| nb::arg("memory"), nb::arg("offset"), nb::arg("nbytes") | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_write", | ||||||||
| [](ChipWorker &self, uint64_t mem, uint64_t offset, nb::bytes data) { | ||||||||
| std::string payload(data.c_str(), data.size()); | ||||||||
| self.shared_memory_write(mem, offset, payload.data(), payload.size()); | ||||||||
| }, | ||||||||
| nb::arg("memory"), nb::arg("offset"), nb::arg("data") | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_notify", &ChipWorker::shared_memory_notify, nb::arg("memory"), nb::arg("signal_id"), | ||||||||
| nb::arg("value") | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_wait", &ChipWorker::shared_memory_wait, nb::arg("memory"), nb::arg("signal_id"), | ||||||||
| nb::arg("target"), nb::arg("timeout_us") = 0 | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_read_l2_for_test", | ||||||||
| [](ChipWorker &self, uint64_t mem, uint64_t offset, size_t nbytes) { | ||||||||
| auto data = self.shared_memory_read_l2_for_test(mem, offset, nbytes); | ||||||||
| return nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()); | ||||||||
| }, | ||||||||
| nb::arg("memory"), nb::arg("offset"), nb::arg("nbytes") | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_write_l2_for_test", | ||||||||
| [](ChipWorker &self, uint64_t mem, uint64_t offset, nb::bytes data) { | ||||||||
| std::string payload(data.c_str(), data.size()); | ||||||||
| self.shared_memory_write_l2_for_test(mem, offset, payload.data(), payload.size()); | ||||||||
| }, | ||||||||
| nb::arg("memory"), nb::arg("offset"), nb::arg("data") | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_notify_l2_for_test", &ChipWorker::shared_memory_notify_l2_for_test, | ||||||||
| nb::arg("memory"), nb::arg("signal_id"), nb::arg("value") | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "shared_memory_wait_l2_for_test", &ChipWorker::shared_memory_wait_l2_for_test, | ||||||||
| nb::arg("memory"), nb::arg("signal_id"), nb::arg("target"), nb::arg("timeout_us") = 0 | ||||||||
| ) | ||||||||
| .def( | ||||||||
| "comm_init", &ChipWorker::comm_init, nb::arg("rank"), nb::arg("nranks"), nb::arg("rootinfo_path"), | ||||||||
| "Initialize a communicator for this rank. ChipWorker owns ACL + stream " | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
|
|
||
| #include <cstdint> | ||
| #include <stdexcept> | ||
| #include <string> | ||
| #include <vector> | ||
|
|
||
| #include "chip_bootstrap_channel.h" | ||
| #include "ring.h" | ||
|
|
@@ -159,6 +161,106 @@ inline void bind_worker(nb::module_ &m) { | |
| }, | ||
| nb::arg("worker_id"), nb::arg("dst"), nb::arg("src"), nb::arg("size"), "Copy worker src to host dst." | ||
| ) | ||
| .def( | ||
| "open_channel", | ||
| [](Orchestrator &self, int worker_id, uint32_t cpu_to_l2_lanes, uint32_t l2_to_cpu_lanes, | ||
| uint32_t lane_depth, uint32_t max_message_bytes) { | ||
| return self.open_channel(worker_id, cpu_to_l2_lanes, l2_to_cpu_lanes, lane_depth, max_message_bytes); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("cpu_to_l2_lanes") = 1, nb::arg("l2_to_cpu_lanes") = 1, | ||
| nb::arg("lane_depth") = 64, nb::arg("max_message_bytes") = 256, | ||
| "Open a host/device message channel on a next-level worker." | ||
| ) | ||
| .def( | ||
| "close_channel", | ||
| [](Orchestrator &self, int worker_id, uint64_t channel) { | ||
| self.close_channel(worker_id, channel); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("channel"), "Close a host/device message channel." | ||
| ) | ||
| .def( | ||
| "channel_send", | ||
| [](Orchestrator &self, int worker_id, uint64_t channel, uint32_t route, nb::bytes data, | ||
| uint64_t correlation_id) { | ||
| std::string payload(data.c_str(), data.size()); | ||
| std::vector<uint8_t> bytes(payload.begin(), payload.end()); | ||
| self.channel_send(worker_id, channel, route, bytes, correlation_id); | ||
|
Comment on lines
+185
to
+187
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation performs multiple redundant copies ( self.channel_send(worker_id, channel, route, data.c_str(), data.size(), correlation_id); |
||
| }, | ||
| nb::arg("worker_id"), nb::arg("channel"), nb::arg("route"), nb::arg("data"), nb::arg("correlation_id") = 0, | ||
| "Send a message through a host/device channel." | ||
| ) | ||
| .def( | ||
| "channel_recv", | ||
| [](Orchestrator &self, int worker_id, uint64_t channel, size_t capacity, uint32_t timeout_us) { | ||
| uint32_t route = 0; | ||
| uint64_t correlation_id = 0; | ||
| auto data = self.channel_recv(worker_id, channel, capacity, timeout_us, &route, &correlation_id); | ||
| return nb::make_tuple( | ||
| nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()), route, correlation_id | ||
| ); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("channel"), nb::arg("capacity") = 256, nb::arg("timeout_us") = 0, | ||
| "Receive a message through a host/device channel." | ||
| ) | ||
| .def( | ||
| "open_shared_memory", | ||
| [](Orchestrator &self, int worker_id, uint64_t data_bytes, uint32_t signal_count, uint32_t flags) { | ||
| return self.open_shared_memory(worker_id, data_bytes, signal_count, flags); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("data_bytes"), nb::arg("signal_count") = 2, nb::arg("flags") = 0, | ||
| "Open a host/device shared-memory region on a next-level worker." | ||
| ) | ||
| .def( | ||
| "close_shared_memory", | ||
| [](Orchestrator &self, int worker_id, uint64_t memory) { | ||
| self.close_shared_memory(worker_id, memory); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("memory"), "Close a host/device shared-memory region." | ||
| ) | ||
| .def( | ||
| "shared_memory_info", | ||
| [](Orchestrator &self, int worker_id, uint64_t memory) { | ||
| HostDeviceMemoryInfo info = self.shared_memory_info(worker_id, memory); | ||
| return nb::make_tuple(info.host_ptr, info.device_ptr, info.data_bytes, info.signal_count, info.flags); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("memory"), | ||
| "Return shared-memory metadata. host_ptr is 0 for hierarchical mailbox callers." | ||
| ) | ||
| .def( | ||
| "shared_memory_read", | ||
| [](Orchestrator &self, int worker_id, uint64_t memory, uint64_t offset, size_t nbytes) { | ||
| auto data = self.shared_memory_read(worker_id, memory, offset, nbytes); | ||
| return nb::bytes(reinterpret_cast<const char *>(data.data()), data.size()); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("memory"), nb::arg("offset"), nb::arg("nbytes"), | ||
| "Read shared-memory bytes via chunked mailbox RPC. Returns a full materialized bytes object; this is " | ||
| "not streaming or zero-copy." | ||
| ) | ||
| .def( | ||
| "shared_memory_write", | ||
| [](Orchestrator &self, int worker_id, uint64_t memory, uint64_t offset, nb::bytes data) { | ||
| std::string payload(data.c_str(), data.size()); | ||
| std::vector<uint8_t> bytes(payload.begin(), payload.end()); | ||
| self.shared_memory_write(worker_id, memory, offset, bytes); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("memory"), nb::arg("offset"), nb::arg("data"), | ||
| "Write shared-memory bytes via chunked mailbox RPC. This is not streaming or zero-copy." | ||
| ) | ||
| .def( | ||
| "shared_memory_notify", | ||
| [](Orchestrator &self, int worker_id, uint64_t memory, uint32_t signal_id, uint64_t value) { | ||
| self.shared_memory_notify(worker_id, memory, signal_id, value); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("memory"), nb::arg("signal_id"), nb::arg("value") | ||
| ) | ||
| .def( | ||
| "shared_memory_wait", | ||
| [](Orchestrator &self, int worker_id, uint64_t memory, uint32_t signal_id, uint64_t target, | ||
| uint32_t timeout_us) { | ||
| self.shared_memory_wait(worker_id, memory, signal_id, target, timeout_us); | ||
| }, | ||
| nb::arg("worker_id"), nb::arg("memory"), nb::arg("signal_id"), nb::arg("target"), nb::arg("timeout_us") = 0 | ||
| ) | ||
| .def( | ||
| "alloc", | ||
| [](Orchestrator &self, const std::vector<uint32_t> &shape, DataType dtype) { | ||
|
|
@@ -251,6 +353,35 @@ inline void bind_worker(nb::module_ &m) { | |
| m.attr("MAILBOX_SIZE") = static_cast<int>(MAILBOX_SIZE); | ||
| m.attr("MAILBOX_OFF_ERROR_MSG") = static_cast<int>(MAILBOX_OFF_ERROR_MSG); | ||
| m.attr("MAILBOX_ERROR_MSG_SIZE") = static_cast<int>(MAILBOX_ERROR_MSG_SIZE); | ||
| m.attr("MAILBOX_OFF_ARGS") = static_cast<int>(MAILBOX_OFF_ARGS); | ||
| m.attr("MAILBOX_ARGS_CAPACITY") = static_cast<int>(MAILBOX_ARGS_CAPACITY); | ||
| m.attr("CTRL_MALLOC") = static_cast<uint64_t>(CTRL_MALLOC); | ||
| m.attr("CTRL_FREE") = static_cast<uint64_t>(CTRL_FREE); | ||
| m.attr("CTRL_COPY_TO") = static_cast<uint64_t>(CTRL_COPY_TO); | ||
| m.attr("CTRL_COPY_FROM") = static_cast<uint64_t>(CTRL_COPY_FROM); | ||
| m.attr("CTRL_PREPARE") = static_cast<uint64_t>(CTRL_PREPARE); | ||
| m.attr("CTRL_REGISTER") = static_cast<uint64_t>(CTRL_REGISTER); | ||
| m.attr("CTRL_UNREGISTER") = static_cast<uint64_t>(CTRL_UNREGISTER); | ||
| m.attr("CTRL_OPEN_CHANNEL") = static_cast<uint64_t>(CTRL_OPEN_CHANNEL); | ||
| m.attr("CTRL_CLOSE_CHANNEL") = static_cast<uint64_t>(CTRL_CLOSE_CHANNEL); | ||
| m.attr("CTRL_CHANNEL_SEND") = static_cast<uint64_t>(CTRL_CHANNEL_SEND); | ||
| m.attr("CTRL_CHANNEL_RECV") = static_cast<uint64_t>(CTRL_CHANNEL_RECV); | ||
| m.attr("CTRL_OPEN_SHARED_MEMORY") = static_cast<uint64_t>(CTRL_OPEN_SHARED_MEMORY); | ||
| m.attr("CTRL_CLOSE_SHARED_MEMORY") = static_cast<uint64_t>(CTRL_CLOSE_SHARED_MEMORY); | ||
| m.attr("CTRL_SHARED_MEMORY_INFO") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_INFO); | ||
| m.attr("CTRL_SHARED_MEMORY_READ") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_READ); | ||
| m.attr("CTRL_SHARED_MEMORY_WRITE") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_WRITE); | ||
| m.attr("CTRL_SHARED_MEMORY_NOTIFY") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_NOTIFY); | ||
| m.attr("CTRL_SHARED_MEMORY_WAIT") = static_cast<uint64_t>(CTRL_SHARED_MEMORY_WAIT); | ||
| m.attr("CTRL_OFF_ARG0") = static_cast<int>(CTRL_OFF_ARG0); | ||
| m.attr("CTRL_OFF_ARG1") = static_cast<int>(CTRL_OFF_ARG1); | ||
| m.attr("CTRL_OFF_ARG2") = static_cast<int>(CTRL_OFF_ARG2); | ||
| m.attr("CTRL_OFF_RESULT") = static_cast<int>(CTRL_OFF_RESULT); | ||
| m.attr("CTRL_OFF_ARG3") = static_cast<int>(CTRL_OFF_ARG3); | ||
| m.attr("CTRL_OFF_ARG4") = static_cast<int>(CTRL_OFF_ARG4); | ||
| m.attr("CTRL_OFF_PAYLOAD") = static_cast<int>(CTRL_OFF_PAYLOAD); | ||
| m.attr("CTRL_PAYLOAD_CAPACITY") = static_cast<int>(CTRL_PAYLOAD_CAPACITY); | ||
| m.attr("CTRL_SHM_NAME_BYTES") = static_cast<int>(CTRL_SHM_NAME_BYTES); | ||
| m.attr("MAX_RING_DEPTH") = static_cast<int32_t>(MAX_RING_DEPTH); | ||
| m.attr("MAX_SCOPE_DEPTH") = static_cast<int32_t>(MAX_SCOPE_DEPTH); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating an intermediate
std::stringfromnb::bytesis redundant and incurs an unnecessary copy.nb::bytesprovides direct access to the underlying buffer viac_str()andsize(), which can be passed directly toself.channel_send.