Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 191 additions & 19 deletions node/src/seeder/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,80 @@ const BATCH_ENQUEUE: usize = 20;
const LAST_UPLOADED_KEY: u64 = 0;
const RETRY_DELAY: Duration = Duration::from_secs(10);

/// Tracks upload confirmations separately from upload scheduling.
///
/// `next_unconfirmed_upload` is the first seed view we do not yet know is uploaded.
/// Any completed view above that pointer is remembered in
/// `completed_out_of_order_uploads` until the gap closes.
struct UploadTracker {
next_unconfirmed_upload: View,
completed_out_of_order_uploads: RMap,
}

impl UploadTracker {
/// Start tracking from the first seed that still needs confirmation.
fn new(next_unconfirmed_upload: View) -> Self {
assert!(next_unconfirmed_upload > 0, "seed uploads start at view 1");
Self {
next_unconfirmed_upload,
completed_out_of_order_uploads: RMap::new(),
}
}

/// Record a completed upload.
///
/// Returns the new contiguous uploaded frontier when this completion closes the
/// current gap; otherwise returns `None`.
fn record_uploaded(&mut self, view: View) -> Option<View> {
// Ignore duplicate or already-accounted-for completions.
if view < self.next_unconfirmed_upload {
return None;
}

// This upload finished early; remember it until the missing lower view arrives.
if view > self.next_unconfirmed_upload {
self.completed_out_of_order_uploads.insert(view);
return None;
}

// We received the exact view we were waiting on. Advance the frontier and
// consume any higher views that had already completed out of order.
let mut uploaded_through = view;
self.next_unconfirmed_upload = self
.next_unconfirmed_upload
.checked_add(1)
.expect("seed upload view overflow");

loop {
let Some(end_region) = self
.completed_out_of_order_uploads
.next_gap(self.next_unconfirmed_upload)
.0
else {
break;
};
self.completed_out_of_order_uploads
.remove(self.next_unconfirmed_upload, end_region);
uploaded_through = end_region;
self.next_unconfirmed_upload = end_region
.checked_add(1)
.expect("seed upload view overflow");
}

Some(uploaded_through)
}

#[cfg(test)]
fn next_unconfirmed_upload(&self) -> View {
self.next_unconfirmed_upload
}

#[cfg(test)]
fn is_waiting_on_out_of_order_completion(&self, view: View) -> bool {
self.completed_out_of_order_uploads.get(&view).is_some()
}
}

pub struct Actor<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> {
context: R,
config: Config<I>,
Expand Down Expand Up @@ -135,13 +209,16 @@ impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Acto

// Track uploads
let mut uploads_outstanding = 0;
let mut cursor = metadata
// Metadata stores the highest confirmed upload, so the next view to start and
// the first view still awaiting confirmation are both `LAST_UPLOADED_KEY + 1`.
let mut next_upload_to_start = metadata
.get(&LAST_UPLOADED_KEY.into())
.cloned()
.unwrap_or(1);
let mut boundary = cursor;
let mut tracked_uploads = RMap::new();
info!(cursor, "initial seed cursor");
.unwrap_or(0)
.checked_add(1)
.expect("seed upload view overflow");
let mut upload_tracker = UploadTracker::new(next_upload_to_start);
info!(next_upload_to_start, "initial seed upload cursor");

// Process messages
loop {
Expand All @@ -154,18 +231,11 @@ impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Acto
// Decrement uploads outstanding
uploads_outstanding -= 1;

// Track uploaded view
tracked_uploads.insert(view);

// Update metadata if lowest uploaded has increased
let Some(end_region) = tracked_uploads.next_gap(boundary).0 else {
continue;
};
if end_region > boundary {
boundary = end_region;
// Update metadata if the contiguous uploaded frontier has increased.
if let Some(end_region) = upload_tracker.record_uploaded(view) {
metadata.put(LAST_UPLOADED_KEY.into(), end_region);
metadata.sync().await.expect("failed to sync metadata");
info!(boundary, "updated seed upload marker");
info!(boundary = end_region, "updated seed upload marker");
}
}
Message::Put(seed) => {
Expand Down Expand Up @@ -286,7 +356,11 @@ impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Acto
// Attempt to upload any seeds
while uploads_outstanding < self.config.max_uploads_outstanding {
// Get next seed
let Some(seed) = storage.get(cursor).await.expect("failed to get seed") else {
let Some(seed) = storage
.get(next_upload_to_start)
.await
.expect("failed to get seed")
else {
break;
};

Expand All @@ -295,7 +369,7 @@ impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Acto

// Upload seed to indexer
self.context.with_label("seed_submit").spawn({
let seed = Seed::new(cursor, seed);
let seed = Seed::new(next_upload_to_start, seed);
let indexer = self.config.indexer.clone();
let mut channel = self.inbound.clone();
move |context| async move {
Expand All @@ -314,9 +388,107 @@ impl<R: Storage + Metrics + Clock + Spawner + GClock + RngCore, I: Indexer> Acto
}
});

// Increment cursor
cursor += 1;
// Increment the next upload we are willing to start.
next_upload_to_start += 1;
}
}
}
}

#[cfg(test)]
mod tests {
use super::UploadTracker;

#[test]
fn upload_tracker_advances_on_in_order_completion() {
let mut tracker = UploadTracker::new(10);

// The happy path: the exact view we were waiting on finishes, so the
// frontier advances by one and no temporary out-of-order state remains.
assert_eq!(tracker.record_uploaded(10), Some(10));
assert_eq!(tracker.next_unconfirmed_upload(), 11);
assert!(!tracker.is_waiting_on_out_of_order_completion(10));
}

#[test]
fn upload_tracker_ignores_duplicate_confirmations() {
let mut tracker = UploadTracker::new(10);

// Once a view has already advanced the frontier, repeating the same
// completion should be a no-op.
assert_eq!(tracker.record_uploaded(10), Some(10));
assert_eq!(tracker.record_uploaded(10), None);
assert_eq!(tracker.next_unconfirmed_upload(), 11);
assert!(!tracker.is_waiting_on_out_of_order_completion(10));
}

#[test]
fn upload_tracker_stops_at_the_next_missing_view() {
let mut tracker = UploadTracker::new(10);

// Assume uploads 10, 11, 12, and 13 have already been started. The tracker
// only sees completion order. Here 11 and 13 finish early, but 12 is still
// missing. Once 10 arrives we can only advance through 11, and 13 must
// remain pending.
assert_eq!(tracker.record_uploaded(11), None);
assert_eq!(tracker.record_uploaded(13), None);
assert!(tracker.is_waiting_on_out_of_order_completion(11));
assert!(tracker.is_waiting_on_out_of_order_completion(13));
assert_eq!(tracker.record_uploaded(10), Some(11));
assert_eq!(tracker.next_unconfirmed_upload(), 12);
assert!(!tracker.is_waiting_on_out_of_order_completion(10));
assert!(!tracker.is_waiting_on_out_of_order_completion(11));
assert!(tracker.is_waiting_on_out_of_order_completion(13));
}

#[test]
fn upload_tracker_consumes_later_contiguous_completions_when_gap_closes() {
let mut tracker = UploadTracker::new(10);

// Assume uploads 10, 11, 12, and 13 are already in flight. After advancing
// to 11, receiving 12 should also consume the already-finished 13 and move
// the frontier to 14.
assert_eq!(tracker.record_uploaded(11), None);
assert_eq!(tracker.record_uploaded(13), None);
assert!(tracker.is_waiting_on_out_of_order_completion(11));
assert!(tracker.is_waiting_on_out_of_order_completion(13));
assert_eq!(tracker.record_uploaded(10), Some(11));
assert!(tracker.is_waiting_on_out_of_order_completion(13));
assert_eq!(tracker.record_uploaded(12), Some(13));
assert_eq!(tracker.next_unconfirmed_upload(), 14);
assert!(!tracker.is_waiting_on_out_of_order_completion(12));
assert!(!tracker.is_waiting_on_out_of_order_completion(13));
}

#[test]
fn upload_tracker_advances_past_out_of_order_completions_once_gap_closes() {
let mut tracker = UploadTracker::new(10);

// Assume uploads 10, 11, and 12 are already in flight. 11 and 12 finish
// before 10. Once 10 arrives, the tracker should advance straight through
// 12 and clear the temporary out-of-order state.
assert_eq!(tracker.record_uploaded(11), None);
assert_eq!(tracker.record_uploaded(12), None);
assert!(tracker.is_waiting_on_out_of_order_completion(11));
assert!(tracker.is_waiting_on_out_of_order_completion(12));
assert_eq!(tracker.record_uploaded(10), Some(12));
assert_eq!(tracker.next_unconfirmed_upload(), 13);
assert!(!tracker.is_waiting_on_out_of_order_completion(10));
assert!(!tracker.is_waiting_on_out_of_order_completion(11));
assert!(!tracker.is_waiting_on_out_of_order_completion(12));
}

#[test]
fn upload_tracker_handles_high_views_without_overflowing() {
let mut tracker = UploadTracker::new(u64::MAX - 2);

// Exercise the checked_add path near the upper bound without crossing it.
// Advancing from MAX-2 through a previously completed MAX-1 should leave
// the tracker waiting on MAX.
assert_eq!(tracker.record_uploaded(u64::MAX - 1), None);
assert!(tracker.is_waiting_on_out_of_order_completion(u64::MAX - 1));
assert_eq!(tracker.record_uploaded(u64::MAX - 2), Some(u64::MAX - 1));
assert_eq!(tracker.next_unconfirmed_upload(), u64::MAX);
assert!(!tracker.is_waiting_on_out_of_order_completion(u64::MAX - 1));
}
}