diff --git a/node/src/seeder/actor.rs b/node/src/seeder/actor.rs index 99ecd3db..ab1dc5e1 100644 --- a/node/src/seeder/actor.rs +++ b/node/src/seeder/actor.rs @@ -35,6 +35,80 @@ const BATCH_ENQUEUE: usize = 20; const LAST_UPLOADED_KEY: u64 = 0; const RETRY_DELAY: Duration = Duration::from_secs(10); +/// Tracks upload confirmations separately from upload scheduling. +/// +/// `next_unconfirmed_upload` is the first seed view we do not yet know is uploaded. +/// Any completed view above that pointer is remembered in +/// `completed_out_of_order_uploads` until the gap closes. +struct UploadTracker { + next_unconfirmed_upload: View, + completed_out_of_order_uploads: RMap, +} + +impl UploadTracker { + /// Start tracking from the first seed that still needs confirmation. + fn new(next_unconfirmed_upload: View) -> Self { + assert!(next_unconfirmed_upload > 0, "seed uploads start at view 1"); + Self { + next_unconfirmed_upload, + completed_out_of_order_uploads: RMap::new(), + } + } + + /// Record a completed upload. + /// + /// Returns the new contiguous uploaded frontier when this completion closes the + /// current gap; otherwise returns `None`. + fn record_uploaded(&mut self, view: View) -> Option { + // Ignore duplicate or already-accounted-for completions. + if view < self.next_unconfirmed_upload { + return None; + } + + // This upload finished early; remember it until the missing lower view arrives. + if view > self.next_unconfirmed_upload { + self.completed_out_of_order_uploads.insert(view); + return None; + } + + // We received the exact view we were waiting on. Advance the frontier and + // consume any higher views that had already completed out of order. + let mut uploaded_through = view; + self.next_unconfirmed_upload = self + .next_unconfirmed_upload + .checked_add(1) + .expect("seed upload view overflow"); + + loop { + let Some(end_region) = self + .completed_out_of_order_uploads + .next_gap(self.next_unconfirmed_upload) + .0 + else { + break; + }; + self.completed_out_of_order_uploads + .remove(self.next_unconfirmed_upload, end_region); + uploaded_through = end_region; + self.next_unconfirmed_upload = end_region + .checked_add(1) + .expect("seed upload view overflow"); + } + + Some(uploaded_through) + } + + #[cfg(test)] + fn next_unconfirmed_upload(&self) -> View { + self.next_unconfirmed_upload + } + + #[cfg(test)] + fn is_waiting_on_out_of_order_completion(&self, view: View) -> bool { + self.completed_out_of_order_uploads.get(&view).is_some() + } +} + pub struct Actor { context: R, config: Config, @@ -135,13 +209,16 @@ impl Acto // Track uploads let mut uploads_outstanding = 0; - let mut cursor = metadata + // Metadata stores the highest confirmed upload, so the next view to start and + // the first view still awaiting confirmation are both `LAST_UPLOADED_KEY + 1`. + let mut next_upload_to_start = metadata .get(&LAST_UPLOADED_KEY.into()) .cloned() - .unwrap_or(1); - let mut boundary = cursor; - let mut tracked_uploads = RMap::new(); - info!(cursor, "initial seed cursor"); + .unwrap_or(0) + .checked_add(1) + .expect("seed upload view overflow"); + let mut upload_tracker = UploadTracker::new(next_upload_to_start); + info!(next_upload_to_start, "initial seed upload cursor"); // Process messages loop { @@ -154,18 +231,11 @@ impl Acto // Decrement uploads outstanding uploads_outstanding -= 1; - // Track uploaded view - tracked_uploads.insert(view); - - // Update metadata if lowest uploaded has increased - let Some(end_region) = tracked_uploads.next_gap(boundary).0 else { - continue; - }; - if end_region > boundary { - boundary = end_region; + // Update metadata if the contiguous uploaded frontier has increased. + if let Some(end_region) = upload_tracker.record_uploaded(view) { metadata.put(LAST_UPLOADED_KEY.into(), end_region); metadata.sync().await.expect("failed to sync metadata"); - info!(boundary, "updated seed upload marker"); + info!(boundary = end_region, "updated seed upload marker"); } } Message::Put(seed) => { @@ -286,7 +356,11 @@ impl Acto // Attempt to upload any seeds while uploads_outstanding < self.config.max_uploads_outstanding { // Get next seed - let Some(seed) = storage.get(cursor).await.expect("failed to get seed") else { + let Some(seed) = storage + .get(next_upload_to_start) + .await + .expect("failed to get seed") + else { break; }; @@ -295,7 +369,7 @@ impl Acto // Upload seed to indexer self.context.with_label("seed_submit").spawn({ - let seed = Seed::new(cursor, seed); + let seed = Seed::new(next_upload_to_start, seed); let indexer = self.config.indexer.clone(); let mut channel = self.inbound.clone(); move |context| async move { @@ -314,9 +388,107 @@ impl Acto } }); - // Increment cursor - cursor += 1; + // Increment the next upload we are willing to start. + next_upload_to_start += 1; } } } } + +#[cfg(test)] +mod tests { + use super::UploadTracker; + + #[test] + fn upload_tracker_advances_on_in_order_completion() { + let mut tracker = UploadTracker::new(10); + + // The happy path: the exact view we were waiting on finishes, so the + // frontier advances by one and no temporary out-of-order state remains. + assert_eq!(tracker.record_uploaded(10), Some(10)); + assert_eq!(tracker.next_unconfirmed_upload(), 11); + assert!(!tracker.is_waiting_on_out_of_order_completion(10)); + } + + #[test] + fn upload_tracker_ignores_duplicate_confirmations() { + let mut tracker = UploadTracker::new(10); + + // Once a view has already advanced the frontier, repeating the same + // completion should be a no-op. + assert_eq!(tracker.record_uploaded(10), Some(10)); + assert_eq!(tracker.record_uploaded(10), None); + assert_eq!(tracker.next_unconfirmed_upload(), 11); + assert!(!tracker.is_waiting_on_out_of_order_completion(10)); + } + + #[test] + fn upload_tracker_stops_at_the_next_missing_view() { + let mut tracker = UploadTracker::new(10); + + // Assume uploads 10, 11, 12, and 13 have already been started. The tracker + // only sees completion order. Here 11 and 13 finish early, but 12 is still + // missing. Once 10 arrives we can only advance through 11, and 13 must + // remain pending. + assert_eq!(tracker.record_uploaded(11), None); + assert_eq!(tracker.record_uploaded(13), None); + assert!(tracker.is_waiting_on_out_of_order_completion(11)); + assert!(tracker.is_waiting_on_out_of_order_completion(13)); + assert_eq!(tracker.record_uploaded(10), Some(11)); + assert_eq!(tracker.next_unconfirmed_upload(), 12); + assert!(!tracker.is_waiting_on_out_of_order_completion(10)); + assert!(!tracker.is_waiting_on_out_of_order_completion(11)); + assert!(tracker.is_waiting_on_out_of_order_completion(13)); + } + + #[test] + fn upload_tracker_consumes_later_contiguous_completions_when_gap_closes() { + let mut tracker = UploadTracker::new(10); + + // Assume uploads 10, 11, 12, and 13 are already in flight. After advancing + // to 11, receiving 12 should also consume the already-finished 13 and move + // the frontier to 14. + assert_eq!(tracker.record_uploaded(11), None); + assert_eq!(tracker.record_uploaded(13), None); + assert!(tracker.is_waiting_on_out_of_order_completion(11)); + assert!(tracker.is_waiting_on_out_of_order_completion(13)); + assert_eq!(tracker.record_uploaded(10), Some(11)); + assert!(tracker.is_waiting_on_out_of_order_completion(13)); + assert_eq!(tracker.record_uploaded(12), Some(13)); + assert_eq!(tracker.next_unconfirmed_upload(), 14); + assert!(!tracker.is_waiting_on_out_of_order_completion(12)); + assert!(!tracker.is_waiting_on_out_of_order_completion(13)); + } + + #[test] + fn upload_tracker_advances_past_out_of_order_completions_once_gap_closes() { + let mut tracker = UploadTracker::new(10); + + // Assume uploads 10, 11, and 12 are already in flight. 11 and 12 finish + // before 10. Once 10 arrives, the tracker should advance straight through + // 12 and clear the temporary out-of-order state. + assert_eq!(tracker.record_uploaded(11), None); + assert_eq!(tracker.record_uploaded(12), None); + assert!(tracker.is_waiting_on_out_of_order_completion(11)); + assert!(tracker.is_waiting_on_out_of_order_completion(12)); + assert_eq!(tracker.record_uploaded(10), Some(12)); + assert_eq!(tracker.next_unconfirmed_upload(), 13); + assert!(!tracker.is_waiting_on_out_of_order_completion(10)); + assert!(!tracker.is_waiting_on_out_of_order_completion(11)); + assert!(!tracker.is_waiting_on_out_of_order_completion(12)); + } + + #[test] + fn upload_tracker_handles_high_views_without_overflowing() { + let mut tracker = UploadTracker::new(u64::MAX - 2); + + // Exercise the checked_add path near the upper bound without crossing it. + // Advancing from MAX-2 through a previously completed MAX-1 should leave + // the tracker waiting on MAX. + assert_eq!(tracker.record_uploaded(u64::MAX - 1), None); + assert!(tracker.is_waiting_on_out_of_order_completion(u64::MAX - 1)); + assert_eq!(tracker.record_uploaded(u64::MAX - 2), Some(u64::MAX - 1)); + assert_eq!(tracker.next_unconfirmed_upload(), u64::MAX); + assert!(!tracker.is_waiting_on_out_of_order_completion(u64::MAX - 1)); + } +}