diff --git a/CHANGELOG.md b/CHANGELOG.md index 8887ceccf..699bb6463 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `ToStreamBuilder` exposes the item type via the `Item` associated type instead of a trait-level generic, and the container builder moves to a method-level generic. This enables method-call syntax: `(0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)` instead of the UFCS form `ToStreamBuilder::>::to_stream_with_builder(0..3, scope)`. +### `Bytes` is now `Sync`, and byte buffers must be `Send` + +`timely_bytes::arc::Bytes` now implements `Sync` in addition to `Send`. To make both impls sound, `BytesMut::from` now requires its payload to be `Send`. This is a breaking change to `timely_communication`: `BytesRefill::logic` now produces `Box + Send>` rather than `Box>`. Custom refills whose buffer type wraps a raw pointer (e.g. `NonNull`) must assert `unsafe impl Send` on that type. + ## [0.29.0](https://github.com/TimelyDataflow/timely-dataflow/compare/timely-v0.28.1...timely-v0.29.0) - 2026-04-13 The theme in this release is simplifying specialization by removing monomorphization sprawl. diff --git a/bytes/src/lib.rs b/bytes/src/lib.rs index 2d371b4d3..4c41b52e4 100644 --- a/bytes/src/lib.rs +++ b/bytes/src/lib.rs @@ -60,7 +60,7 @@ pub mod arc { impl BytesMut { /// Create a new instance from a byte allocation. - pub fn from(bytes: B) -> BytesMut where B : DerefMut+'static { + pub fn from(bytes: B) -> BytesMut where B : DerefMut+Send+'static { // Sequester allocation behind an `Arc`, which *should* keep the address // stable for the lifetime of `sequestered`. The `Arc` also serves as our @@ -187,9 +187,20 @@ pub mod arc { } // Synchronization happens through `self.sequestered`, which means to ensure that even - // across multiple threads the referenced range of bytes remain valid. + // across multiple threads the referenced range of bytes remains valid. unsafe impl Send for Bytes { } + // `Sync` holds because everything reachable through `&Bytes` is read-only or atomic: + // `Deref` yields `&[u8]` (and `u8: Sync`), the mutating methods take `&mut self`, and + // cloning only touches the atomic `Arc` refcount. There is no interior mutability and + // no path to a `&mut` from a shared reference. + // + // Note this requires only that the sequestered payload `B` be `Send` (enforced by + // `BytesMut::from`), not `Sync`: `B` is never exposed by reference, so it is never + // shared across threads. The only cross-thread use of `B` is its destructor, which may + // run on whichever thread drops the last `Arc` clone -- and that needs `Send`, not `Sync`. + unsafe impl Sync for Bytes { } + impl Bytes { /// Extracts [0, index) into a new `Bytes` which is returned, updating `self`. diff --git a/communication/examples/lgalloc.rs b/communication/examples/lgalloc.rs index 3f805d95e..f542ba668 100644 --- a/communication/examples/lgalloc.rs +++ b/communication/examples/lgalloc.rs @@ -34,6 +34,10 @@ mod example { Box::new(LgallocHandle { handle, pointer, capacity }) } + // `LgallocHandle` wraps a `NonNull`, which is `!Send`; lgalloc allocations are safe to + // move and deallocate across threads, so we assert `Send` to permit sharing as `Bytes`. + unsafe impl Send for LgallocHandle { } + struct LgallocHandle { handle: Option, pointer: NonNull, @@ -67,7 +71,7 @@ mod example { lgalloc::lgalloc_set_config(&lgconfig); let refill = BytesRefill { - logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box>), + logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box+Send>), limit: None, }; diff --git a/communication/src/allocator/zero_copy/bytes_slab.rs b/communication/src/allocator/zero_copy/bytes_slab.rs index e7eaac4b9..2eec454e2 100644 --- a/communication/src/allocator/zero_copy/bytes_slab.rs +++ b/communication/src/allocator/zero_copy/bytes_slab.rs @@ -21,7 +21,7 @@ pub struct BytesSlab { #[derive(Clone)] pub struct BytesRefill { /// Logic to acquire a new buffer of a certain number of bytes. - pub logic: std::sync::Arc Box>+Send+Sync>, + pub logic: std::sync::Arc Box+Send>+Send+Sync>, /// An optional limit on the number of empty buffers retained. pub limit: Option, } @@ -111,7 +111,7 @@ impl BytesSlab { /// A wrapper for `Box>` that dereferences to `T` rather than `dyn DerefMut`. struct BoxDerefMut { - boxed: Box+'static>, + boxed: Box+Send+'static>, } impl Deref for BoxDerefMut { diff --git a/communication/src/initialize.rs b/communication/src/initialize.rs index dcd856917..e99f51676 100644 --- a/communication/src/initialize.rs +++ b/communication/src/initialize.rs @@ -77,7 +77,7 @@ impl Default for Hooks { Self { log_fn: Arc::new(|_| None), refill: BytesRefill { - logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box>), + logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box+Send>), limit: None, }, spill: None, diff --git a/timely/src/dataflow/operators/core/capture/event.rs b/timely/src/dataflow/operators/core/capture/event.rs index 3bd68b9c1..d43a1e51c 100644 --- a/timely/src/dataflow/operators/core/capture/event.rs +++ b/timely/src/dataflow/operators/core/capture/event.rs @@ -264,7 +264,7 @@ pub mod binary { pub fn new(r: R) -> Self { let refill = BytesRefill { logic: Arc::new(|size| { - Box::new(vec![0_u8; size]) as Box> + Box::new(vec![0_u8; size]) as Box + Send> }), limit: None, };