Skip to content

Add server-side offline table freshness tracking (OfflineFreshnessTracker)#17897

Open
priyen-stripe wants to merge 1 commit intoapache:masterfrom
priyen:offline-table-freshness
Open

Add server-side offline table freshness tracking (OfflineFreshnessTracker)#17897
priyen-stripe wants to merge 1 commit intoapache:masterfrom
priyen:offline-table-freshness

Conversation

@priyen-stripe
Copy link
Copy Markdown
Contributor

@priyen-stripe priyen-stripe commented Mar 17, 2026

Description

Add server-side ingestion delay monitoring for offline tables, mirroring the IngestionDelayTracker pattern used by realtime tables.

Problem: Offline tables that receive frequent micro-segments have no server-side freshness monitoring. A concrete use-case: a Flink application that continuously writes small segments covering 5-minute time buckets per partition. With segments arriving every few minutes, operators need minute-level freshness tracking to detect when a pipeline stalls — if a partition stops receiving new segments, the lag should visibly increase over time so it can be alerted on. Realtime tables already have IngestionDelayTracker for this; offline tables have nothing comparable. The existing controller metric OFFLINE_SEGMENT_DELAY_HOURS is too coarse (hourly granularity, reports in hours) and doesn't scale well to tables with very large segment counts (200K+).

Solution: A new OfflineFreshnessTracker component in OfflineTableDataManager that:

  • Tracks max(segment_end_time) per partition using a ConcurrentHashMap
  • Registers per-partition OFFLINE_INGESTION_DELAY_MS callback gauges (now - max(segment_end_time))
  • Registers table-level OFFLINE_TABLE_INGESTION_DELAY_MS callback gauge (worst-partition lag)
  • All gauges are callback-based (evaluated live on every metrics scrape), so lag naturally increases over time even without new segments arriving — critical for stuck pipeline alerting
  • segmentRemoved() recomputes only the affected partition using compute() for atomicity

Here is also an example of it in action in a scenario where 5 minute segments (16 total as 1 per partition) are being written to:
image

Design decisions

  1. Server-side, not controller-side: Servers know exactly when segments are loaded and queryable. No extra ZK reads needed.
  2. All offline tables: No opt-in config required. The tracker is lightweight (maps of segment names to timestamps/partitions).
  3. Partition ID via SegmentUtils.getSegmentPartitionId(): Returns a single unambiguous partition ID. Non-partitioned tables fall back to sentinel -1; segments with ambiguous partition metadata (multi-partition) are skipped to avoid polluting the non-partitioned bucket.
  4. Callback-based gauges: Both partition-level and table-level metrics are registered as supplier callbacks (not periodic snapshots), so they are always evaluated live on scrape. This avoids stale metric values.
  5. Per-partition gauges: Uses ServerMetrics.setOrUpdatePartitionGauge() / removePartitionGauge() exactly like IngestionDelayTracker.

Changes

  • ServerGauge: Added OFFLINE_INGESTION_DELAY_MS and OFFLINE_TABLE_INGESTION_DELAY_MS gauge definitions
  • OfflineFreshnessTracker (new): Core tracking component — maintains per-partition segment end-time maps, registers callback-based gauges, handles segment replacement atomically
  • OfflineTableDataManager: Integrates the tracker — calls segmentLoaded() in doAddOnlineSegment(), segmentRemoved() in doOffloadSegment(), shutdown() in doShutdown()
  • DefaultTableDataManagerProvider: Passes isServerReadyToServeQueries supplier to OfflineTableDataManager (tracker returns 0 lag until server is ready)
  • OfflineFreshnessTrackerTest (new): 25 unit tests covering partitioned/non-partitioned tables, segment add/remove/replacement, gauge registration, concurrency, and edge cases

Labels

feature, observability

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Mar 17, 2026

Codecov Report

❌ Patch coverage is 78.22581% with 27 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.29%. Comparing base (6a52aa0) to head (06283a7).
⚠️ Report is 289 commits behind head on master.

Files with missing lines Patch % Lines
.../data/manager/offline/OfflineTableDataManager.java 27.27% 22 Missing and 2 partials ⚠️
.../data/manager/offline/OfflineFreshnessTracker.java 96.59% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17897      +/-   ##
============================================
+ Coverage     63.21%   63.29%   +0.07%     
- Complexity     1480     1525      +45     
============================================
  Files          3190     3195       +5     
  Lines        192312   193767    +1455     
  Branches      29475    29803     +328     
============================================
+ Hits         121573   122640    +1067     
- Misses        61201    61507     +306     
- Partials       9538     9620      +82     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.20% <78.22%> (+28.93%) ⬆️
java-21 63.26% <78.22%> (+0.05%) ⬆️
temurin 63.29% <78.22%> (+0.07%) ⬆️
unittests 63.28% <78.22%> (+0.07%) ⬆️
unittests1 55.56% <78.22%> (+0.03%) ⬆️
unittests2 34.22% <18.54%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@priyen-stripe
Copy link
Copy Markdown
Contributor Author

cc @Jackie-Jiang @suvodeep-pyne

@priyen priyen force-pushed the offline-table-freshness branch from 51240d1 to 8fbec84 Compare March 17, 2026 20:07
for (ColumnPartitionMetadata colMeta : columnPartitionMap.values()) {
Set<Integer> partitions = colMeta.getPartitions();
if (partitions != null && !partitions.isEmpty()) {
return partitions.iterator().next();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: why do we return only the first partition? Can this break if we have multiple partitions in the offline table?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wont break, but im not sure what it means to track lag in that sense, if a segment belongs to more then 1 partition

Comment thread pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java Outdated
@xiangfu0 xiangfu0 added the observability Related to observability (logging, tracing, metrics) label Mar 20, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds server-side freshness monitoring for offline tables on Pinot servers by introducing an OfflineFreshnessTracker, mirroring the realtime IngestionDelayTracker pattern to provide continuously increasing lag metrics via callback gauges.

Changes:

  • Introduces OfflineFreshnessTracker to track per-partition max(segment_end_time) and expose callback freshness-lag gauges.
  • Integrates freshness tracking into OfflineTableDataManager lifecycle (load/offload/shutdown) and wires server readiness via DefaultTableDataManagerProvider.
  • Adds new server gauge definitions and comprehensive unit tests for the tracker.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineFreshnessTracker.java New tracker that maintains per-segment/per-partition freshness state and registers callback gauges.
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java Hooks tracker into segment load/offload and table shutdown; extracts partition id from segment ZK metadata.
pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java Passes isServerReadyToServeQueries supplier into OfflineTableDataManager.
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java Adds offline freshness lag gauge definitions.
pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/OfflineFreshnessTrackerTest.java New unit tests covering tracker behavior, gauge registration, and edge cases.

@priyen priyen force-pushed the offline-table-freshness branch from 8fbec84 to 6171fb4 Compare March 20, 2026 17:01
@priyen-stripe
Copy link
Copy Markdown
Contributor Author

@suvodeep-pyne , addressed your and copilot's comments

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

@xiangfu0 xiangfu0 added the real-time Related to realtime table ingestion and serving label Mar 20, 2026
…essTracker)

- Rename metrics to OFFLINE_INGESTION_DELAY_MS and OFFLINE_TABLE_INGESTION_DELAY_MS
  to match REALTIME_INGESTION_DELAY_MS naming convention
- Use callback-based gauges for both partition-level and table-level metrics
- Gate gauge callbacks on isServerReadyToServeQueries (return 0 during startup)
- Use SegmentUtils.getSegmentPartitionId() for partition extraction; skip tracking
  for ambiguous segments (multi-partition), use sentinel only for truly non-partitioned
- Handle segment replacement in segmentLoaded() by cleaning up old bookkeeping first
- Use AtomicBoolean.compareAndSet for table gauge registration
- Use per-partition segment maps for O(partition_size) removal; use compute() in
  segmentRemoved() to atomically update partition state and avoid load/remove races
- Remove redundant _segmentEndTimes map (superseded by _partitionSegmentEndTimes)
- Use ExecutorService + Futures in concurrent test for deterministic failure propagation
@priyen priyen force-pushed the offline-table-freshness branch from 6171fb4 to 06283a7 Compare March 24, 2026 21:31
@priyen-stripe
Copy link
Copy Markdown
Contributor Author

cc @suvodeep-pyne

Copy link
Copy Markdown
Contributor

@suvodeep-pyne suvodeep-pyne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@priyen-stripe
Copy link
Copy Markdown
Contributor Author

cc @Jackie-Jiang or @xiangfu0

@priyen-stripe
Copy link
Copy Markdown
Contributor Author

cc @Jackie-Jiang or @xiangfu0
cc @suvodeep-pyne

@suvodeep-pyne
Copy link
Copy Markdown
Contributor

@KKcorps
stripe needs a more realtime metric for offline ingestion. can you PTAL and share your thoughts.

@Jackie-Jiang Jackie-Jiang added configuration Config changes (addition/deletion/change in behavior) feature New functionality and removed real-time Related to realtime table ingestion and serving labels Apr 11, 2026
Copy link
Copy Markdown
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think tracking OFFLINE table lag on server is a good idea. For any new uploaded segment, it can be assigned to any servers.
For CONSUMING segment, it is guaranteed that each partition is always consumed by the same servers (without rebalance). We don't have same guarantee for offline segments.

@priyen-stripe
Copy link
Copy Markdown
Contributor Author

priyen-stripe commented Apr 13, 2026

I don't think tracking OFFLINE table lag on server is a good idea. For any new uploaded segment, it can be assigned to any servers. For CONSUMING segment, it is guaranteed that each partition is always consumed by the same servers (without rebalance). We don't have same guarantee for offline segments.

I think as long as you approipriately aggregate the metric (max by table or partition, etc) they would still be accurate.
what are you thinking re: a controller based tracker?

@Jackie-Jiang
Copy link
Copy Markdown
Contributor

I don't think tracking OFFLINE table lag on server is a good idea. For any new uploaded segment, it can be assigned to any servers. For CONSUMING segment, it is guaranteed that each partition is always consumed by the same servers (without rebalance). We don't have same guarantee for offline segments.

I think as long as you approipriately aggregate the metric (max by table or partition, etc) they would still be accurate. what are you thinking re: a controller based tracker?

This aggregate would be quite hard. You don't want to simply pick the max. You need to pick the min from the replicas getting the latest segment, and the metrics from servers serving the partition but not getting the latest segment.

A controller based tracker would be much easier. If the lag of update is the concern, consider emitting a metric when the segment is uploaded

@priyen-stripe
Copy link
Copy Markdown
Contributor Author

priyen-stripe commented Apr 16, 2026

I don't think tracking OFFLINE table lag on server is a good idea. For any new uploaded segment, it can be assigned to any servers. For CONSUMING segment, it is guaranteed that each partition is always consumed by the same servers (without rebalance). We don't have same guarantee for offline segments.

I think as long as you approipriately aggregate the metric (max by table or partition, etc) they would still be accurate. what are you thinking re: a controller based tracker?

This aggregate would be quite hard. You don't want to simply pick the max. You need to pick the min from the replicas getting the latest segment, and the metrics from servers serving the partition but not getting the latest segment.

A controller based tracker would be much easier. If the lag of update is the concern, consider emitting a metric when the segment is uploaded

In a controller-based tracker, we'd need to rely on the external view right? So we know which segments are actually queryable?

I'm still struggling to understand how this is different (the server-based tracker) from their measurement of consuming partitions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

configuration Config changes (addition/deletion/change in behavior) feature New functionality observability Related to observability (logging, tracing, metrics)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants