Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `ToStreamBuilder` exposes the item type via the `Item` associated type instead of a trait-level generic, and the container builder moves to a method-level generic. This enables method-call syntax: `(0..3).to_stream_with_builder::<_, CapacityContainerBuilder<_>>(scope)` instead of the UFCS form `ToStreamBuilder::<CapacityContainerBuilder<_>>::to_stream_with_builder(0..3, scope)`.

### `Bytes` is now `Sync`, and byte buffers must be `Send`

`timely_bytes::arc::Bytes` now implements `Sync` in addition to `Send`. To make both impls sound, `BytesMut::from` now requires its payload to be `Send`. This is a breaking change to `timely_communication`: `BytesRefill::logic` now produces `Box<dyn DerefMut<Target=[u8]> + Send>` rather than `Box<dyn DerefMut<Target=[u8]>>`. Custom refills whose buffer type wraps a raw pointer (e.g. `NonNull`) must assert `unsafe impl Send` on that type.

## [0.29.0](https://github.com/TimelyDataflow/timely-dataflow/compare/timely-v0.28.1...timely-v0.29.0) - 2026-04-13

The theme in this release is simplifying specialization by removing monomorphization sprawl.
Expand Down
15 changes: 13 additions & 2 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub mod arc {
impl BytesMut {

/// Create a new instance from a byte allocation.
pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+'static {
pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+Send+'static {

// Sequester allocation behind an `Arc`, which *should* keep the address
// stable for the lifetime of `sequestered`. The `Arc` also serves as our
Expand Down Expand Up @@ -187,9 +187,20 @@ pub mod arc {
}

// Synchronization happens through `self.sequestered`, which means to ensure that even
// across multiple threads the referenced range of bytes remain valid.
// across multiple threads the referenced range of bytes remains valid.
unsafe impl Send for Bytes { }

// `Sync` holds because everything reachable through `&Bytes` is read-only or atomic:
// `Deref` yields `&[u8]` (and `u8: Sync`), the mutating methods take `&mut self`, and
// cloning only touches the atomic `Arc` refcount. There is no interior mutability and
// no path to a `&mut` from a shared reference.
//
// Note this requires only that the sequestered payload `B` be `Send` (enforced by
// `BytesMut::from`), not `Sync`: `B` is never exposed by reference, so it is never
// shared across threads. The only cross-thread use of `B` is its destructor, which may
// run on whichever thread drops the last `Arc` clone -- and that needs `Send`, not `Sync`.
unsafe impl Sync for Bytes { }

impl Bytes {

/// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
Expand Down
6 changes: 5 additions & 1 deletion communication/examples/lgalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ mod example {
Box::new(LgallocHandle { handle, pointer, capacity })
}

// `LgallocHandle` wraps a `NonNull`, which is `!Send`; lgalloc allocations are safe to
// move and deallocate across threads, so we assert `Send` to permit sharing as `Bytes`.
unsafe impl Send for LgallocHandle { }

struct LgallocHandle {
handle: Option<lgalloc::Handle>,
pointer: NonNull<u8>,
Expand Down Expand Up @@ -67,7 +71,7 @@ mod example {
lgalloc::lgalloc_set_config(&lgconfig);

let refill = BytesRefill {
logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>),
logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>+Send>),
limit: None,
};

Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/bytes_slab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct BytesSlab {
#[derive(Clone)]
pub struct BytesRefill {
/// Logic to acquire a new buffer of a certain number of bytes.
pub logic: std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>,
pub logic: std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>+Send>+Send+Sync>,
/// An optional limit on the number of empty buffers retained.
pub limit: Option<usize>,
}
Expand Down Expand Up @@ -111,7 +111,7 @@ impl BytesSlab {

/// A wrapper for `Box<dyn DerefMut<Target=T>>` that dereferences to `T` rather than `dyn DerefMut<Target=T>`.
struct BoxDerefMut {
boxed: Box<dyn DerefMut<Target=[u8]>+'static>,
boxed: Box<dyn DerefMut<Target=[u8]>+Send+'static>,
}

impl Deref for BoxDerefMut {
Expand Down
2 changes: 1 addition & 1 deletion communication/src/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Default for Hooks {
Self {
log_fn: Arc::new(|_| None),
refill: BytesRefill {
logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>>),
logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>+Send>),
limit: None,
},
spill: None,
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/capture/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ pub mod binary {
pub fn new(r: R) -> Self {
let refill = BytesRefill {
logic: Arc::new(|size| {
Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target = [u8]>>
Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target = [u8]> + Send>
}),
limit: None,
};
Expand Down
Loading