ENH: Covariance CPU SPMD support#3507
Conversation
|
/intelci: run |
| // Simple allreduce of centered crossproducts is incorrect because each | ||
| // rank uses its local mean. Un-center before allreduce, then re-center | ||
| // with global statistics after. | ||
| if (!desc.get_assume_centered()) { |
There was a problem hiding this comment.
Will it be simpler to use DAAL's Distributed<step2Master> algorithm for aggregation?
https://github.com/uxlfoundation/oneDAL/blob/main/samples/daal/cpp/mpi/sources/covariance_dense_distributed_mpi.cpp#L106
There was a problem hiding this comment.
Interesting suggestion, will look into this
|
/intelci: run |
Fixes the ABI breakage introduced by CPU SPMD algorithms in the PRs #3507, #3585 Make the binding type of the destructors (~compute_input(), ~partial_compute_input(), ~train_input(), etc.) stably "WEAK" and not dependent on the compiler's logic by adding their non-default implementations. Also fixes `set_responses` method in KNN.
Fixes the ABI breakage introduced by CPU SPMD algorithms in the PRs #3507, #3585 Make the binding type of the destructors (~compute_input(), ~partial_compute_input(), ~train_input(), etc.) stably "WEAK" and not dependent on the compiler's logic by adding their non-default implementations. Also fixes `set_responses` method in KNN. (cherry picked from commit 0a5f4cb)
Fixes the ABI breakage introduced by CPU SPMD algorithms in the PRs #3507, #3585 Make the binding type of the destructors (~compute_input(), ~partial_compute_input(), ~train_input(), etc.) stably "WEAK" and not dependent on the compiler's logic by adding their non-default implementations. Also fixes `set_responses` method in KNN. (cherry picked from commit 0a5f4cb) Co-authored-by: Victoriya Fedotova <victoriya.s.fedotova@intel.com>
|
/intelci: run |
There was a problem hiding this comment.
Pull request overview
This PR adds CPU distributed (SPMD) execution support for the covariance algorithm in oneDAL’s oneAPI C++ implementation, enabling multi-rank aggregation and unblocking CPU execution of existing SPMD tests and samples.
Changes:
- Enable CPU SPMD dispatch for covariance compute operations.
- Implement CPU backend SPMD aggregation logic for covariance (partial -> allreduce -> finalize).
- Add new distributed covariance samples for MPI and CCL; enable CPU execution of the covariance SPMD integration test.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
cpp/oneapi/dal/algo/covariance/detail/compute_ops.cpp |
Switches covariance CPU dispatch to a universal SPMD-capable kernel dispatcher. |
cpp/oneapi/dal/algo/covariance/backend/cpu/compute_kernel_dense.cpp |
Adds CPU SPMD path: compute local partials, allreduce partial statistics, and finalize on aggregated partials. |
cpp/oneapi/dal/algo/covariance/test/spmd.cpp |
Allows covariance SPMD integration test to run on CPU policies. |
samples/oneapi/cpp/mpi/sources/covariance_distr_mpi.cpp |
New MPI distributed covariance sample using dal::preview::compute. |
samples/oneapi/cpp/ccl/sources/covariance_distr_ccl.cpp |
New CCL distributed covariance sample using dal::preview::compute. |
| static compute_result<Task> call_daal_spmd_kernel(const context_cpu& ctx, | ||
| const detail::descriptor_base<Task>& desc, | ||
| const detail::compute_parameters<Task>& params, | ||
| const table& data) { | ||
| auto& comm = ctx.get_communicator(); | ||
| const std::int64_t component_count = data.get_column_count(); | ||
|
|
||
| // Compute partial results locally on this rank's data | ||
| partial_compute_input<Task> partial_input(data); | ||
| auto partial_result = | ||
| partial_compute_kernel_cpu<Float, method::by_default, Task>{}(ctx, desc, partial_input); |
There was a problem hiding this comment.
call_daal_spmd_kernel() receives params but never uses it; this means the CPU hyperparameters selected in compute_parameters_cpu (block size, grain size, etc.) are ignored in SPMD mode. Consider passing params through to the DAAL online/finalize calls (instead of default-constructing detail::compute_parameters{} inside partial/finalize kernels), or otherwise document/encode that these parameters are intentionally unused for SPMD.
| compute_parameters<Task> select_parameters(const Policy& ctx, | ||
| const descriptor_base<Task>& desc, | ||
| const compute_input<Task>& input) const { | ||
| using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_SINGLE_NODE_CPU( | ||
| using kernel_dispatcher_t = dal::backend::kernel_dispatcher<KERNEL_UNIVERSAL_SPMD_CPU( | ||
| parameters::compute_parameters_cpu<Float, Method, Task>)>; | ||
| return kernel_dispatcher_t{}(ctx, desc, input); |
There was a problem hiding this comment.
KERNEL_UNIVERSAL_SPMD_CPU dispatcher uses context_cpu{} for host_policy (see cpp/oneapi/dal/backend/dispatcher.hpp:231-233), which drops any non-default settings carried by the passed policy (e.g., enabled/disabled CPU extensions). To preserve single-node behavior while enabling SPMD, consider using a kernel_dispatcher with both KERNEL_SINGLE_NODE_CPU(...) (for host_policy) and KERNEL_UNIVERSAL_SPMD_CPU(...) (for spmd_host_policy), similar to how compute_ops_dpc.cpp mixes kernel specs.
| static compute_result<Task> call_daal_spmd_kernel(const context_cpu& ctx, | ||
| const detail::descriptor_base<Task>& desc, | ||
| const detail::compute_parameters<Task>& params, | ||
| const table& data) { |
There was a problem hiding this comment.
params is unused in call_daal_spmd_kernel(), and the SPMD path currently ignores covariance CPU hyperparameters (compute_parameters) that are applied in the single-node path via convert_parameters(params). Either thread params through the partial/finalize kernels (so distributed runs honor the same tuning knobs) or explicitly mark/remove the parameter to avoid unused-parameter warnings and clarify that hyperparameters are intentionally not supported in SPMD mode.
| const table& data) { | |
| const table& data) { | |
| // SPMD covariance currently does not thread compute_parameters through | |
| // partial/finalize kernels, so keep the argument for interface | |
| // compatibility and mark it as intentionally unused here. | |
| (void)params; |
| comm.allreduce(nobs_ary).wait(); | ||
| comm.allreduce(sums_ary).wait(); | ||
| comm.allreduce(crossproduct_ary).wait(); | ||
|
|
There was a problem hiding this comment.
The SPMD implementation performs three separate allreduce() calls with an immediate .wait() after each one. This introduces extra synchronization/latency; consider reducing the number of collectives (e.g., pack nobs, sums, and crossproduct into a single buffer and allreduce once) or at least issuing the collectives first and waiting after to allow potential overlap.
| comm.allreduce(nobs_ary).wait(); | |
| comm.allreduce(sums_ary).wait(); | |
| comm.allreduce(crossproduct_ary).wait(); | |
| auto nobs_request = comm.allreduce(nobs_ary); | |
| auto sums_request = comm.allreduce(sums_ary); | |
| auto crossproduct_request = comm.allreduce(crossproduct_ary); | |
| nobs_request.wait(); | |
| sums_request.wait(); | |
| crossproduct_request.wait(); |
|
/intelci: run |
Description
This PR adds distributed (SPMD) support for Covariance on CPU.
Checklist:
Completeness and readability
Testing
No new failures in the CIs.
Performance