feat(subscribers): add typed event for operator and stats callbacks#6479
feat(subscribers): add typed event for operator and stats callbacks#6479
Conversation
Greptile SummaryThis PR introduces typed event structs ( Key changes:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant RSM as RuntimeStatsManager
participant Sub as Subscriber trait
participant Dash as DashboardSubscriber
participant PyW as PySubscriberWrapper
participant Py as Python Subscriber
Note over RSM: Operator activates
RSM->>Sub: on_operator_start(Arc<OperatorStartEvent>)
Sub->>Dash: on_operator_start(event)
Sub->>PyW: on_operator_start(event)
PyW->>Py: on_operator_start(query_id, node_id)
Note over RSM: Throttle tick fires
RSM->>Sub: on_stats(Arc<StatsEvent>)
Sub->>Dash: on_stats(event)
Sub->>PyW: on_stats(event)
PyW->>Py: on_stats(query_id, stats_map)
Note over RSM: Operator finalizes
RSM->>Sub: on_stats(Arc<StatsEvent>)
RSM->>Sub: on_operator_end(Arc<OperatorEndEvent>)
Sub->>Dash: on_operator_end(event)
Sub->>PyW: on_operator_end(event)
PyW->>Py: on_operator_end(query_id, node_id)
Note over Sub,Py: Distributed path (not yet migrated)
Sub-->>Dash: on_exec_operator_start(query_id, node_id)
Sub-->>PyW: on_exec_operator_start(query_id, node_id)
PyW-->>Py: on_exec_operator_start(query_id, node_id)
Py-->>Py: delegates to on_operator_start (base class shim)
Reviews (2): Last reviewed commit: "fix tests" | Re-trigger Greptile |
| async fn on_operator_start(&self, event: Arc<OperatorStartEvent>) -> DaftResult<()> { | ||
| Python::attach(|py| { | ||
| self.0.call_method1( | ||
| py, | ||
| intern!(py, "on_operator_start"), | ||
| (event.header.query_id.to_string(), event.operator.node_id), | ||
| )?; | ||
| Ok(()) | ||
| }) |
There was a problem hiding this comment.
Silent behavioral regression for existing Python subscribers
PySubscriberWrapper::on_operator_start now calls Python's on_operator_start (the new method), but legacy Python subclasses were required to implement on_exec_operator_start (which was @abstractmethod). Any user who only overrode on_exec_operator_start without also overriding on_operator_start will silently receive no operator-lifecycle callbacks from local execution.
Tracing the call chain for local execution:
RuntimeStatsManagercallssubscriber.on_operator_start(event)(new Rust path)PySubscriberWrapper::on_operator_start→ Pythonon_operator_start→ base-classpass(no-op)- The user's
on_exec_operator_startoverride is never invoked
The same regression applies to on_stats / on_exec_emit_stats and on_operator_end / on_exec_operator_end.
A backward-compatible fix would be to make the Python bridge call the old method as a shim inside the new method. For example:
async fn on_operator_start(&self, event: Arc<OperatorStartEvent>) -> DaftResult<()> {
Python::attach(|py| {
// Try the new method first; fall back to the old name for backward compat
let result = self.0.call_method1(
py,
intern!(py, "on_operator_start"),
(event.header.query_id.to_string(), event.operator.node_id),
);
// If on_operator_start raised AttributeError (not overridden), call legacy name
if result.is_err() {
self.0.call_method1(
py,
intern!(py, "on_exec_operator_start"),
(event.header.query_id.to_string(), event.operator.node_id),
)?;
}
Ok(())
})
}Alternatively, this should be explicitly marked as a breaking change in the PR title and release notes.
0193323 to
1f458c5
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #6479 +/- ##
==========================================
- Coverage 75.33% 75.31% -0.02%
==========================================
Files 1028 1029 +1
Lines 138704 138809 +105
==========================================
+ Hits 104486 104544 +58
- Misses 34218 34265 +47
🚀 New features to boost your workflow:
|
| self._write_event(query_id, "plan_physical", {"plan": physical_plan}) | ||
|
|
||
| def on_exec_operator_start(self, query_id: str, node_id: int) -> None: | ||
| def on_operator_start(self, query_id: str, node_id: int) -> None: |
There was a problem hiding this comment.
Extrapolate the function be called on_end after compiling
There was a problem hiding this comment.
Can you provide some more information on what you mean here?
|
|
||
| for res in future::join_all(subscribers.iter().map(|subscriber| subscriber.on_exec_operator_start(query_id.clone(), node_id))).await { | ||
| let Some(operator_meta) = operators.get(&node_id) else { | ||
| log::warn!("Unknown node_id {node_id} in operators during activate, skipping subscriber notification"); |
There was a problem hiding this comment.
[Nitpick]: the error message should be called on both operator_start and operator_end. After the instantiation of the final variable
There was a problem hiding this comment.
I'm not sure I understand this? Can you provide an example?
| ), | ||
| "exec_operator_end", | ||
| ); | ||
| Ok(()) |
There was a problem hiding this comment.
Nice -- it's worth syncing with @srilman on this, but I would advise a single on_event and then use a trait/interface/ABC for the Event. I have suggested this before, because you can layer stricter typing on it.
In the current setup, consumers would need to implement a callback for all event types exhaustively, making any new event a break change. The workaround is to have a convention of always adding a no-op default, which is fragile and relies on developers behaving well.
As mentioned, you can layer this per-event handlers on top of the more generic subscriber. You will see a similar pattern in
Daft/daft/expressions/visitor.py
Line 14 in 13fd9fa
There was a problem hiding this comment.
The plan is the switch the rust subscriber to an on_event with an Event enum in future PRs. Trying to keep the PRs kind of small as we port over to this new pattern. I'm thinking after the rust work is done we can figure out the python interfaces.
a2a9b11 to
14990d0
Compare
| pub struct EventHeader { | ||
| pub query_id: QueryID, | ||
| pub timestamp_epoch_secs: f64, | ||
| } |
Changes Made
Introduce OperatorStartEvent, OperatorEndEvent, and StatsEvent with EventHeader and OperatorMeta, replacing positional parameters in the three operator/stats subscriber callbacks. RuntimeStatsManager now constructs typed events and calls the new on_operator_start, on_operator_end, and on_stats methods. Old methods are kept with default no-op implementations for distributed path backward compatibility.
There will be follow up to convert the distributed code to use the events and to update the python subscriber to take events instead of positional args. Overtime the subscriber framework will be converted to use event types so additional information can be added without changing the api.
Example of output from debug subscriber.