Add server-side offline table freshness tracking (OfflineFreshnessTracker)#17897
Add server-side offline table freshness tracking (OfflineFreshnessTracker)#17897priyen-stripe wants to merge 1 commit intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17897 +/- ##
============================================
+ Coverage 63.21% 63.29% +0.07%
- Complexity 1480 1525 +45
============================================
Files 3190 3195 +5
Lines 192312 193767 +1455
Branches 29475 29803 +328
============================================
+ Hits 121573 122640 +1067
- Misses 61201 61507 +306
- Partials 9538 9620 +82
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
51240d1 to
8fbec84
Compare
| for (ColumnPartitionMetadata colMeta : columnPartitionMap.values()) { | ||
| Set<Integer> partitions = colMeta.getPartitions(); | ||
| if (partitions != null && !partitions.isEmpty()) { | ||
| return partitions.iterator().next(); |
There was a problem hiding this comment.
Q: why do we return only the first partition? Can this break if we have multiple partitions in the offline table?
There was a problem hiding this comment.
It wont break, but im not sure what it means to track lag in that sense, if a segment belongs to more then 1 partition
There was a problem hiding this comment.
Pull request overview
Adds server-side freshness monitoring for offline tables on Pinot servers by introducing an OfflineFreshnessTracker, mirroring the realtime IngestionDelayTracker pattern to provide continuously increasing lag metrics via callback gauges.
Changes:
- Introduces
OfflineFreshnessTrackerto track per-partitionmax(segment_end_time)and expose callback freshness-lag gauges. - Integrates freshness tracking into
OfflineTableDataManagerlifecycle (load/offload/shutdown) and wires server readiness viaDefaultTableDataManagerProvider. - Adds new server gauge definitions and comprehensive unit tests for the tracker.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineFreshnessTracker.java | New tracker that maintains per-segment/per-partition freshness state and registers callback gauges. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java | Hooks tracker into segment load/offload and table shutdown; extracts partition id from segment ZK metadata. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java | Passes isServerReadyToServeQueries supplier into OfflineTableDataManager. |
| pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java | Adds offline freshness lag gauge definitions. |
| pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/OfflineFreshnessTrackerTest.java | New unit tests covering tracker behavior, gauge registration, and edge cases. |
8fbec84 to
6171fb4
Compare
|
@suvodeep-pyne , addressed your and copilot's comments |
…essTracker) - Rename metrics to OFFLINE_INGESTION_DELAY_MS and OFFLINE_TABLE_INGESTION_DELAY_MS to match REALTIME_INGESTION_DELAY_MS naming convention - Use callback-based gauges for both partition-level and table-level metrics - Gate gauge callbacks on isServerReadyToServeQueries (return 0 during startup) - Use SegmentUtils.getSegmentPartitionId() for partition extraction; skip tracking for ambiguous segments (multi-partition), use sentinel only for truly non-partitioned - Handle segment replacement in segmentLoaded() by cleaning up old bookkeeping first - Use AtomicBoolean.compareAndSet for table gauge registration - Use per-partition segment maps for O(partition_size) removal; use compute() in segmentRemoved() to atomically update partition state and avoid load/remove races - Remove redundant _segmentEndTimes map (superseded by _partitionSegmentEndTimes) - Use ExecutorService + Futures in concurrent test for deterministic failure propagation
6171fb4 to
06283a7
Compare
|
cc @Jackie-Jiang or @xiangfu0 |
|
cc @Jackie-Jiang or @xiangfu0 |
|
@KKcorps |
There was a problem hiding this comment.
I don't think tracking OFFLINE table lag on server is a good idea. For any new uploaded segment, it can be assigned to any servers.
For CONSUMING segment, it is guaranteed that each partition is always consumed by the same servers (without rebalance). We don't have same guarantee for offline segments.
I think as long as you approipriately aggregate the metric (max by table or partition, etc) they would still be accurate. |
This aggregate would be quite hard. You don't want to simply pick the max. You need to pick the min from the replicas getting the latest segment, and the metrics from servers serving the partition but not getting the latest segment. A controller based tracker would be much easier. If the lag of update is the concern, consider emitting a metric when the segment is uploaded |
In a controller-based tracker, we'd need to rely on the external view right? So we know which segments are actually queryable? I'm still struggling to understand how this is different (the server-based tracker) from their measurement of consuming partitions? |
Description
Add server-side ingestion delay monitoring for offline tables, mirroring the
IngestionDelayTrackerpattern used by realtime tables.Problem: Offline tables that receive frequent micro-segments have no server-side freshness monitoring. A concrete use-case: a Flink application that continuously writes small segments covering 5-minute time buckets per partition. With segments arriving every few minutes, operators need minute-level freshness tracking to detect when a pipeline stalls — if a partition stops receiving new segments, the lag should visibly increase over time so it can be alerted on. Realtime tables already have
IngestionDelayTrackerfor this; offline tables have nothing comparable. The existing controller metricOFFLINE_SEGMENT_DELAY_HOURSis too coarse (hourly granularity, reports in hours) and doesn't scale well to tables with very large segment counts (200K+).Solution: A new
OfflineFreshnessTrackercomponent inOfflineTableDataManagerthat:max(segment_end_time)per partition using aConcurrentHashMapOFFLINE_INGESTION_DELAY_MScallback gauges (now - max(segment_end_time))OFFLINE_TABLE_INGESTION_DELAY_MScallback gauge (worst-partition lag)segmentRemoved()recomputes only the affected partition usingcompute()for atomicityHere is also an example of it in action in a scenario where 5 minute segments (16 total as 1 per partition) are being written to:

Design decisions
SegmentUtils.getSegmentPartitionId(): Returns a single unambiguous partition ID. Non-partitioned tables fall back to sentinel-1; segments with ambiguous partition metadata (multi-partition) are skipped to avoid polluting the non-partitioned bucket.ServerMetrics.setOrUpdatePartitionGauge()/removePartitionGauge()exactly likeIngestionDelayTracker.Changes
ServerGauge: AddedOFFLINE_INGESTION_DELAY_MSandOFFLINE_TABLE_INGESTION_DELAY_MSgauge definitionsOfflineFreshnessTracker(new): Core tracking component — maintains per-partition segment end-time maps, registers callback-based gauges, handles segment replacement atomicallyOfflineTableDataManager: Integrates the tracker — callssegmentLoaded()indoAddOnlineSegment(),segmentRemoved()indoOffloadSegment(),shutdown()indoShutdown()DefaultTableDataManagerProvider: PassesisServerReadyToServeQueriessupplier toOfflineTableDataManager(tracker returns 0 lag until server is ready)OfflineFreshnessTrackerTest(new): 25 unit tests covering partitioned/non-partitioned tables, segment add/remove/replacement, gauge registration, concurrency, and edge casesLabels
feature,observability