Skip to content

feat(subscribers): add typed event for operator and stats callbacks#6479

Merged
cckellogg merged 4 commits intomainfrom
chris/subscriber-events-op-stats
Mar 26, 2026
Merged

feat(subscribers): add typed event for operator and stats callbacks#6479
cckellogg merged 4 commits intomainfrom
chris/subscriber-events-op-stats

Conversation

@cckellogg
Copy link
Copy Markdown
Contributor

Changes Made

Introduce OperatorStartEvent, OperatorEndEvent, and StatsEvent with EventHeader and OperatorMeta, replacing positional parameters in the three operator/stats subscriber callbacks. RuntimeStatsManager now constructs typed events and calls the new on_operator_start, on_operator_end, and on_stats methods. Old methods are kept with default no-op implementations for distributed path backward compatibility.

There will be follow up to convert the distributed code to use the events and to update the python subscriber to take events instead of positional args. Overtime the subscriber framework will be converted to use event types so additional information can be added without changing the api.

Example of output from debug subscriber.

Started query `daring-ocean-914054` with unoptimized plan:
{"children":[{"batch_size":10000,"children":[{"children":[{"children":[{"children":[],"type":"Source"}],"projection":["col(VendorID)","col(total_amount)","col(tpep_pickup_datetime)"],"type":"Project"}],"limit":100000,"type":"Limit"}],"type":"IntoBatches"}],"type":"Sink"}
Started planning query `daring-ocean-914054`
Finished planning query `daring-ocean-914054` with optimized plan:
{"children":[{"batch_size":10000,"children":[{"children":[{"children":[{"children":[],"type":"Source"}],"limit":100000,"type":"Limit"}],"projection":["col(VendorID)","col(total_amount)","col(tpep_pickup_datetime)"],"type":"Project"}],"type":"IntoBatches"}],"type":"Sink"}
Started executing query `daring-ocean-914054` with physical plan:
{"approx_stats":{"approx_stats":{"acc_selectivity":1.0,"num_rows":0,"size_bytes":0}},"category":"BlockingSink","children":[{"approx_stats":{"approx_stats":{"acc_selectivity":1.0,"num_rows":0,"size_bytes":0}},"category":"BlockingSink","children":[{"approx_stats":{"approx_stats":{"acc_selectivity":1.0,"num_rows":100000,"size_bytes":2400000}},"category":"Intermediate","children":[{"approx_stats":{"approx_stats":{"acc_selectivity":1.0,"num_rows":100000,"size_bytes":2400000}},"category":"Intermediate","children":[{"approx_stats":{"approx_stats":{"acc_selectivity":1.0,"num_rows":100000,"size_bytes":2400000}},"category":"StreamingSink","children":[{"approx_stats":{"approx_stats":{"acc_selectivity":1.0,"num_rows":100000,"size_bytes":2400000}},"category":"Source","id":0,"name":"Read Parquet","type":"ScanTask"}],"id":1,"name":"Limit 100000","type":"Limit"}],"id":2,"name":"Rename & Reorder","type":"Project"}],"id":3,"name":"Into Batches of 10000","type":"IntoBatches"}],"id":4,"name":"CSV Write","type":"Write"}],"id":5,"name":"Commit Write","type":"CommitWrite"}
operator_start query_id=daring-ocean-914054 node_id=0 name="Read Parquet" type=ScanTask category=Source
stats query_id=daring-ocean-914054 node_id=0 duration=0ns rows.out=0 bytes.read=0
stats query_id=daring-ocean-914054 node_id=0 duration=0ns rows.out=0 bytes.read=0
stats query_id=daring-ocean-914054 node_id=0 duration=0ns rows.out=0 bytes.read=65536
operator_start query_id=daring-ocean-914054 node_id=1 name="Limit 100000" type=Limit category=StreamingSink
operator_start query_id=daring-ocean-914054 node_id=2 name="Rename & Reorder" type=Project category=Intermediate
operator_start query_id=daring-ocean-914054 node_id=3 name="Into Batches of 10000" type=IntoBatches category=Intermediate
operator_start query_id=daring-ocean-914054 node_id=4 name="CSV Write" type=Write category=BlockingSink
stats query_id=daring-ocean-914054 node_id=0 duration=0ns rows.out=88091 bytes.read=2164041
stats query_id=daring-ocean-914054 node_id=2 duration=3.765ms rows.in=88091 rows.out=88091
stats query_id=daring-ocean-914054 node_id=3 duration=72µs rows.in=78091 rows.out=70000
stats query_id=daring-ocean-914054 node_id=1 duration=0ns rows.in=88091 rows.out=88091
stats query_id=daring-ocean-914054 node_id=4 duration=113.125ms rows.in=50000 rows.written=40000 bytes.written=960000
stats query_id=daring-ocean-914054 node_id=1 duration=0ns rows.in=100000 rows.out=100000
operator_end query_id=daring-ocean-914054 node_id=1 name="Limit 100000"
stats query_id=daring-ocean-914054 node_id=0 duration=0ns rows.out=100000 bytes.read=2164041
operator_end query_id=daring-ocean-914054 node_id=0 name="Read Parquet"
stats query_id=daring-ocean-914054 node_id=2 duration=3.806ms rows.in=100000 rows.out=100000
operator_end query_id=daring-ocean-914054 node_id=2 name="Rename & Reorder"
stats query_id=daring-ocean-914054 node_id=3 duration=90µs rows.in=100000 rows.out=100000
operator_end query_id=daring-ocean-914054 node_id=3 name="Into Batches of 10000"
stats query_id=daring-ocean-914054 node_id=4 duration=244.632ms rows.in=100000 rows.written=100000 bytes.written=2400000
operator_end query_id=daring-ocean-914054 node_id=4 name="CSV Write"
operator_start query_id=daring-ocean-914054 node_id=5 name="Commit Write" type=CommitWrite category=BlockingSink
stats query_id=daring-ocean-914054 node_id=5 duration=29.221ms rows.in=1 rows.out=1
operator_end query_id=daring-ocean-914054 node_id=5 name="Commit Write"
Finished executing query `daring-ocean-914054`

@cckellogg cckellogg requested a review from a team as a code owner March 24, 2026 22:26
@github-actions github-actions Bot added the feat label Mar 24, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Mar 24, 2026

Greptile Summary

This PR introduces typed event structs (OperatorStartEvent, OperatorEndEvent, StatsEvent) with EventHeader and OperatorMeta, replacing raw positional parameters in the three operator/stats subscriber callbacks. RuntimeStatsManager now constructs and dispatches these typed events via new on_operator_start, on_stats, and on_operator_end trait methods; the old on_exec_* methods are kept as no-op defaults for the distributed path, which will be migrated in a follow-up.

Key changes:

  • events.rs (new): Defines EventHeader, OperatorMeta, and the three event types; OperatorMeta derives from NodeInfo via a From impl.
  • mod.rs (subscriber trait): Old abstract methods become default no-ops; three new typed-event methods added with default no-ops.
  • runtime_stats/mod.rs: Builds an operator_meta_map during pipeline traversal and uses it to construct typed events; adds defensive operators.get(&node_id) guards to avoid panics on unknown node IDs.
  • python.rs: Adds Rust bridge methods for the three new events; legacy on_exec_* Rust overrides remain for the distributed path.
  • dashboard.rs / debug.rs: Both subscribers gain the three new event handler overrides.
  • abc.py: Old @abstractmethod annotations removed; deprecated methods become shims forwarding to the new API.
  • Breaking change note: Removing @abstractmethod from on_exec_operator_start, on_exec_emit_stats, and on_exec_operator_end is a behavioral breaking change — existing Python subscribers that only override the old methods will silently receive no callbacks from local execution. Per conventional commit convention, this warrants a feat!: title prefix.

Confidence Score: 4/5

  • Safe to merge with the understanding that existing Python subscribers overriding only the legacy abstract methods will silently lose local-execution callbacks until they migrate.
  • The core Rust plumbing is solid — typed events are well-structured, the defensive node-ID guards prevent panics, and both Rust-native subscribers (dashboard, debug) are correctly updated. The main risk is the Python-side backward-compatibility gap (already tracked in a prior review thread), which the PR description explicitly defers to a follow-up. The conventional-commit title is missing a ! for the behavioral breaking change, but this is a documentation/process issue rather than a runtime one. No new clippy suppressions are introduced.
  • src/daft-context/src/subscribers/python.rs — the dual dispatch paths (old on_exec_* overrides for distributed, new on_* overrides for local) should be reviewed carefully once the distributed migration lands to avoid routing confusion.

Important Files Changed

Filename Overview
daft/subscribers/abc.py Removes @abstractmethod from the three operator/stats methods, demotes them to optional shims calling the new API, and adds new on_operator_start, on_operator_end, on_stats as non-abstract defaults; behavioral breaking change for existing Python subscribers that only overrode the old abstract methods.
src/daft-context/src/subscribers/events.rs New file introducing EventHeader, OperatorMeta, OperatorStartEvent, OperatorEndEvent, and StatsEvent; clean, well-typed structs with a From<&NodeInfo> impl for OperatorMeta.
src/daft-context/src/subscribers/python.rs Adds Rust implementations of on_operator_start, on_operator_end, on_stats that call the corresponding Python methods; old on_exec_* overrides remain for the distributed path. Python subscribers that only override the legacy methods will silently receive no callbacks from local execution (tracked in previous thread).
src/daft-context/src/subscribers/dashboard.rs Adds on_operator_start, on_stats, on_operator_end overrides; on_stats duplicates the stats-formatting logic from on_exec_emit_stats rather than delegating, but is behaviorally equivalent for the local execution path.
src/daft-local-execution/src/runtime_stats/mod.rs Core change: builds operator_meta_map during pipeline traversal and uses it to construct typed events for on_operator_start, on_stats, and on_operator_end. Adds a defensive .get() guard before event construction with a log::warn! for unknown node IDs, improving safety over the previous panic-prone index access.

Sequence Diagram

sequenceDiagram
    participant RSM as RuntimeStatsManager
    participant Sub as Subscriber trait
    participant Dash as DashboardSubscriber
    participant PyW as PySubscriberWrapper
    participant Py as Python Subscriber

    Note over RSM: Operator activates
    RSM->>Sub: on_operator_start(Arc<OperatorStartEvent>)
    Sub->>Dash: on_operator_start(event)
    Sub->>PyW: on_operator_start(event)
    PyW->>Py: on_operator_start(query_id, node_id)

    Note over RSM: Throttle tick fires
    RSM->>Sub: on_stats(Arc<StatsEvent>)
    Sub->>Dash: on_stats(event)
    Sub->>PyW: on_stats(event)
    PyW->>Py: on_stats(query_id, stats_map)

    Note over RSM: Operator finalizes
    RSM->>Sub: on_stats(Arc<StatsEvent>)
    RSM->>Sub: on_operator_end(Arc<OperatorEndEvent>)
    Sub->>Dash: on_operator_end(event)
    Sub->>PyW: on_operator_end(event)
    PyW->>Py: on_operator_end(query_id, node_id)

    Note over Sub,Py: Distributed path (not yet migrated)
    Sub-->>Dash: on_exec_operator_start(query_id, node_id)
    Sub-->>PyW: on_exec_operator_start(query_id, node_id)
    PyW-->>Py: on_exec_operator_start(query_id, node_id)
    Py-->>Py: delegates to on_operator_start (base class shim)
Loading

Reviews (2): Last reviewed commit: "fix tests" | Re-trigger Greptile

Comment on lines +167 to +175
async fn on_operator_start(&self, event: Arc<OperatorStartEvent>) -> DaftResult<()> {
Python::attach(|py| {
self.0.call_method1(
py,
intern!(py, "on_operator_start"),
(event.header.query_id.to_string(), event.operator.node_id),
)?;
Ok(())
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Silent behavioral regression for existing Python subscribers

PySubscriberWrapper::on_operator_start now calls Python's on_operator_start (the new method), but legacy Python subclasses were required to implement on_exec_operator_start (which was @abstractmethod). Any user who only overrode on_exec_operator_start without also overriding on_operator_start will silently receive no operator-lifecycle callbacks from local execution.

Tracing the call chain for local execution:

  1. RuntimeStatsManager calls subscriber.on_operator_start(event) (new Rust path)
  2. PySubscriberWrapper::on_operator_start → Python on_operator_start → base-class pass (no-op)
  3. The user's on_exec_operator_start override is never invoked

The same regression applies to on_stats / on_exec_emit_stats and on_operator_end / on_exec_operator_end.

A backward-compatible fix would be to make the Python bridge call the old method as a shim inside the new method. For example:

async fn on_operator_start(&self, event: Arc<OperatorStartEvent>) -> DaftResult<()> {
    Python::attach(|py| {
        // Try the new method first; fall back to the old name for backward compat
        let result = self.0.call_method1(
            py,
            intern!(py, "on_operator_start"),
            (event.header.query_id.to_string(), event.operator.node_id),
        );
        // If on_operator_start raised AttributeError (not overridden), call legacy name
        if result.is_err() {
            self.0.call_method1(
                py,
                intern!(py, "on_exec_operator_start"),
                (event.header.query_id.to_string(), event.operator.node_id),
            )?;
        }
        Ok(())
    })
}

Alternatively, this should be explicitly marked as a breaking change in the PR title and release notes.

Comment thread src/daft-local-execution/src/runtime_stats/mod.rs
@cckellogg cckellogg force-pushed the chris/subscriber-events-op-stats branch from 0193323 to 1f458c5 Compare March 24, 2026 23:01
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 24, 2026

Codecov Report

❌ Patch coverage is 81.19266% with 41 lines in your changes missing coverage. Please review.
✅ Project coverage is 75.31%. Comparing base (acc8e55) to head (14990d0).
⚠️ Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-context/src/subscribers/debug.rs 0.00% 14 Missing ⚠️
src/daft-context/src/subscribers/mod.rs 0.00% 12 Missing ⚠️
src/daft-local-execution/src/runtime_stats/mod.rs 92.66% 8 Missing ⚠️
daft/subscribers/abc.py 66.66% 3 Missing ⚠️
src/daft-context/src/subscribers/python.rs 93.18% 3 Missing ⚠️
src/daft-context/src/subscribers/dashboard.rs 93.75% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #6479      +/-   ##
==========================================
- Coverage   75.33%   75.31%   -0.02%     
==========================================
  Files        1028     1029       +1     
  Lines      138704   138809     +105     
==========================================
+ Hits       104486   104544      +58     
- Misses      34218    34265      +47     
Files with missing lines Coverage Δ
daft/subscribers/events.py 76.70% <100.00%> (ø)
src/daft-context/src/subscribers/events.rs 100.00% <100.00%> (ø)
src/daft-context/src/subscribers/dashboard.rs 72.69% <93.75%> (-5.17%) ⬇️
daft/subscribers/abc.py 67.44% <66.66%> (-2.56%) ⬇️
src/daft-context/src/subscribers/python.rs 60.35% <93.18%> (-20.45%) ⬇️
src/daft-local-execution/src/runtime_stats/mod.rs 82.72% <92.66%> (+0.18%) ⬆️
src/daft-context/src/subscribers/mod.rs 23.91% <0.00%> (-8.44%) ⬇️
src/daft-context/src/subscribers/debug.rs 0.00% <0.00%> (ø)

... and 21 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

self._write_event(query_id, "plan_physical", {"plan": physical_plan})

def on_exec_operator_start(self, query_id: str, node_id: int) -> None:
def on_operator_start(self, query_id: str, node_id: int) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extrapolate the function be called on_end after compiling

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide some more information on what you mean here?


for res in future::join_all(subscribers.iter().map(|subscriber| subscriber.on_exec_operator_start(query_id.clone(), node_id))).await {
let Some(operator_meta) = operators.get(&node_id) else {
log::warn!("Unknown node_id {node_id} in operators during activate, skipping subscriber notification");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nitpick]: the error message should be called on both operator_start and operator_end. After the instantiation of the final variable

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this? Can you provide an example?

@cckellogg
Copy link
Copy Markdown
Contributor Author

@greptileai

),
"exec_operator_end",
);
Ok(())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice -- it's worth syncing with @srilman on this, but I would advise a single on_event and then use a trait/interface/ABC for the Event. I have suggested this before, because you can layer stricter typing on it.

In the current setup, consumers would need to implement a callback for all event types exhaustively, making any new event a break change. The workaround is to have a convention of always adding a no-op default, which is fragile and relies on developers behaving well.

As mentioned, you can layer this per-event handlers on top of the more generic subscriber. You will see a similar pattern in

class ExpressionVisitor(ABC, Generic[R]):
where you have a more generic base abstraction and layer stricter typing onto this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is the switch the rust subscriber to an on_event with an Event enum in future PRs. Trying to keep the PRs kind of small as we port over to this new pattern. I'm thinking after the rust work is done we can figure out the python interfaces.

@cckellogg cckellogg force-pushed the chris/subscriber-events-op-stats branch from a2a9b11 to 14990d0 Compare March 26, 2026 15:52
@cckellogg cckellogg requested a review from a team March 26, 2026 16:39
pub struct EventHeader {
pub query_id: QueryID,
pub timestamp_epoch_secs: f64,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@cckellogg cckellogg merged commit 45e1225 into main Mar 26, 2026
36 checks passed
@cckellogg cckellogg deleted the chris/subscriber-events-op-stats branch March 26, 2026 16:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants