feat(streams): opt-in time-throttle for legacy per-slice state emission#1036
feat(streams): opt-in time-throttle for legacy per-slice state emission#1036Anatolii Yatsuk (tolik0) wants to merge 2 commits into
Conversation
…am attribute Adds `Stream.state_emission_throttle_seconds: Optional[float] = None`. When set, the per-slice state message emitted from `Stream.read()` is suppressed for any slice that completes within `state_emission_throttle_seconds` of the previous emit. A final state message is force-emitted at end-of-stream if any throttled emit was suppressed, so the platform/destination always sees the latest state for the sync. Default `None` preserves the historical behaviour exactly (emit a state message after every slice). Streams that produce many slices with growing state payloads (e.g. file-based sources whose state carries a history dict that grows by one entry per file synced) can opt in to bound the platform-side state buffer that otherwise pins memory until the destination ACKs each message. Targets the same OOM mode as `ConcurrentPerPartitionCursor`'s state throttle (oncall #7856), but for the legacy non-concurrent file-based read path (oncall #12663 alternative). Unit-tested: * default (None) keeps one state per slice, no final duplicate. * throttle=600s with sub-window slices suppresses per-slice emits and force-emits a single final state carrying the latest cursor value.
|
/prerelease
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/cdk/legacy-stream-state-throttle#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/cdk/legacy-stream-state-throttlePR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
Warning Review limit reached
More reviews will be available in 54 minutes and 39 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR adds optional state emission throttling to Airbyte CDK streams. When ChangesState Emission Throttling
🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
unit_tests/sources/streams/test_stream_read.py (1)
831-833: 💤 Low valueThe comment seems a bit contradictory — mind clarifying?
The phrase "All under 600 except the first" suggests the first is NOT under 600, but then "(cold start, 0 - 0 < 600 so suppressed)" says it IS under 600 and IS suppressed.
Perhaps something like: "All deltas are under 600 (including the cold-start where 0 - 0 = 0), so all per-slice emits are suppressed." wdyt?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@unit_tests/sources/streams/test_stream_read.py` around lines 831 - 833, The comment above mock_time.side_effect is contradictory; update the inline comment near mock_time.side_effect = [0, 10, 20, 30, 40, 50] to clearly state that all deltas are under 600 (including the cold-start delta of 0), so all per-slice emits are suppressed — e.g., replace the existing sentence with a single clear line like "All deltas are < 600 (cold-start 0 - 0 = 0 included), so per-slice emits are suppressed." and ensure it references the mock_time side_effect behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@airbyte_cdk/sources/streams/core.py`:
- Around line 260-295: The failing formatting is in the checkpoint emission
block around the use of
should_checkpoint/throttle_seconds/last_state_emit_at/throttled_pending_emit and
calls to checkpoint_reader.next() and self._checkpoint_state; run the
auto-formatter (poetry run ruff format .) to reflow and fix the indentation/line
breaks in that section so lines 269-276 comply with ruff rules, then verify the
logic around yielding self._checkpoint_state(checkpoint_state,
state_manager=state_manager) and the final conditional that yields
last_observed_checkpoint remains unchanged.
In `@unit_tests/sources/streams/test_stream_read.py`:
- Around line 846-848: The list comprehension assigning state_messages is split
across lines and fails ruff formatting; reformat it to a single line (e.g.,
assign state_messages = [m for m in actual_records if getattr(m, "type", None)
== MessageType.STATE]) or run the project formatter (poetry run ruff format .)
to auto-fix the expression referencing actual_records and MessageType.STATE in
test_stream_read.py.
- Around line 899-901: The list comprehension that builds state_messages (using
actual_records and MessageType.STATE) needs the same formatting fix as the other
test: reformat the comprehension to follow the project's ruff style
(wrap/line-break the brackets and comprehensions consistently). Update the
comprehension formatting for state_messages to match the other corrected case
(keep the expression: state_messages = [m for m in actual_records if getattr(m,
"type", None) == MessageType.STATE]) and run `poetry run ruff format .` to apply
the canonical formatting.
---
Nitpick comments:
In `@unit_tests/sources/streams/test_stream_read.py`:
- Around line 831-833: The comment above mock_time.side_effect is contradictory;
update the inline comment near mock_time.side_effect = [0, 10, 20, 30, 40, 50]
to clearly state that all deltas are under 600 (including the cold-start delta
of 0), so all per-slice emits are suppressed — e.g., replace the existing
sentence with a single clear line like "All deltas are < 600 (cold-start 0 - 0 =
0 included), so per-slice emits are suppressed." and ensure it references the
mock_time side_effect behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: ca0b5bcf-6141-4909-89b2-2f1a42242125
📒 Files selected for processing (2)
airbyte_cdk/sources/streams/core.pyunit_tests/sources/streams/test_stream_read.py
| if should_checkpoint and checkpoint_state is not None: | ||
| airbyte_state_message = self._checkpoint_state( | ||
| checkpoint_state, state_manager=state_manager | ||
| ) | ||
| yield airbyte_state_message | ||
| last_observed_checkpoint = checkpoint_state | ||
| if throttle_seconds is not None: | ||
| now = time.time() | ||
| if now - last_state_emit_at < throttle_seconds: | ||
| # Suppress this per-slice state emission; the final- | ||
| # state emit below will catch up if no other emit | ||
| # fires before the stream ends. | ||
| throttled_pending_emit = True | ||
| else: | ||
| last_state_emit_at = now | ||
| throttled_pending_emit = False | ||
| yield self._checkpoint_state( | ||
| checkpoint_state, state_manager=state_manager | ||
| ) | ||
| else: | ||
| yield self._checkpoint_state( | ||
| checkpoint_state, state_manager=state_manager | ||
| ) | ||
|
|
||
| next_slice = checkpoint_reader.next() | ||
|
|
||
| checkpoint = checkpoint_reader.get_checkpoint() | ||
| if should_checkpoint and checkpoint is not None: | ||
| airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager) | ||
| yield airbyte_state_message | ||
| elif ( | ||
| should_checkpoint | ||
| and throttle_seconds is not None | ||
| and throttled_pending_emit | ||
| and last_observed_checkpoint is not None | ||
| ): | ||
| # Throttling suppressed the final per-slice emit. Force a final | ||
| # state message so the platform/destination always sees the | ||
| # latest stream state for this sync. | ||
| yield self._checkpoint_state(last_observed_checkpoint, state_manager=state_manager) |
There was a problem hiding this comment.
Pipeline is failing due to formatting — could you run poetry run ruff format . to fix this?
The ruff format check flagged lines 269-276 as needing reformatting. The logic itself looks solid, wdyt?
🧰 Tools
🪛 GitHub Actions: Linters / 0_Ruff Format Check.txt
[error] 269-279: ruff format --diff reported formatting changes needed (code reformatted to a single-line call). Run 'poetry run ruff format .' to apply formatting.
🪛 GitHub Actions: Linters / Ruff Format Check
[error] 269-276: ruff format --diff reported formatting changes were needed. It would reformat 2 files (this file). Run 'poetry run ruff format .' or use 'ruff format --write' to apply formatting.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@airbyte_cdk/sources/streams/core.py` around lines 260 - 295, The failing
formatting is in the checkpoint emission block around the use of
should_checkpoint/throttle_seconds/last_state_emit_at/throttled_pending_emit and
calls to checkpoint_reader.next() and self._checkpoint_state; run the
auto-formatter (poetry run ruff format .) to reflow and fix the indentation/line
breaks in that section so lines 269-276 comply with ruff rules, then verify the
logic around yielding self._checkpoint_state(checkpoint_state,
state_manager=state_manager) and the final conditional that yields
last_observed_checkpoint remains unchanged.
| state_messages = [ | ||
| m for m in actual_records if getattr(m, "type", None) == MessageType.STATE | ||
| ] |
There was a problem hiding this comment.
Pipeline failing on formatting — ruff wants this list comprehension on a single line.
Could you run poetry run ruff format . to auto-fix this?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@unit_tests/sources/streams/test_stream_read.py` around lines 846 - 848, The
list comprehension assigning state_messages is split across lines and fails ruff
formatting; reformat it to a single line (e.g., assign state_messages = [m for m
in actual_records if getattr(m, "type", None) == MessageType.STATE]) or run the
project formatter (poetry run ruff format .) to auto-fix the expression
referencing actual_records and MessageType.STATE in test_stream_read.py.
| state_messages = [ | ||
| m for m in actual_records if getattr(m, "type", None) == MessageType.STATE | ||
| ] |
There was a problem hiding this comment.
Same formatting fix needed here for the list comprehension.
Running poetry run ruff format . will fix both list comprehensions in these new tests.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@unit_tests/sources/streams/test_stream_read.py` around lines 899 - 901, The
list comprehension that builds state_messages (using actual_records and
MessageType.STATE) needs the same formatting fix as the other test: reformat the
comprehension to follow the project's ruff style (wrap/line-break the brackets
and comprehensions consistently). Update the comprehension formatting for
state_messages to match the other corrected case (keep the expression:
state_messages = [m for m in actual_records if getattr(m, "type", None) ==
MessageType.STATE]) and run `poetry run ruff format .` to apply the canonical
formatting.
…rce to streams Adds `FileBasedSource._stream_state_emission_throttle_seconds: Optional[float] = None` that, when set, is applied to every DefaultFileBasedStream the source creates in `_make_default_stream`. Connectors only need to set the attribute on their FileBasedSource subclass — no per-stream subclassing required. Pairs with the `Stream.state_emission_throttle_seconds` opt-in added in the previous commit. Default behaviour unchanged when the attribute is left at its None default.
There was a problem hiding this comment.
Pull request overview
Adds an opt-in, time-based throttle for per-slice state emission in Stream.read() to reduce excessive state message volume (and associated orchestrator buffering/memory pressure) for legacy slice-heavy streams, while preserving default behavior when unset.
Changes:
- Introduces
Stream.state_emission_throttle_seconds: Optional[float] = Noneand throttling logic for per-slice checkpoint emits. - Adds “force final state” emission when throttling suppressed the last per-slice emit and the checkpoint reader would otherwise return
Noneat end-of-stream. - Adds unit tests covering throttled vs default behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
airbyte_cdk/sources/streams/core.py |
Implements opt-in per-slice state emission throttling in Stream.read(). |
unit_tests/sources/streams/test_stream_read.py |
Adds tests for throttled state emission and default behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| throttle_seconds = self.state_emission_throttle_seconds | ||
| last_state_emit_at: float = 0.0 | ||
| last_observed_checkpoint: Optional[Mapping[str, Any]] = None | ||
| throttled_pending_emit = False |
| # Cold-start emit is suppressed (delta 0 < 600); subsequent ones suppressed; | ||
| # final-state force emit fires once at end of stream. | ||
| assert len(state_messages) == 1 | ||
| # Final state must carry the latest observed cursor. | ||
| final = state_messages[-1] | ||
| assert final.state.stream.stream_state.created_at == timestamp |
| # 600 s throttle. We mock time.time so the first emit fires, the next two | ||
| # fall inside the throttle window, and the loop ends needing a final emit. | ||
| stream.state_emission_throttle_seconds = 600 | ||
| mock_time = mocker.patch("airbyte_cdk.sources.streams.core.time.time") | ||
| # First call inside throttle check -> 0s; second -> 10s; third -> 20s; | ||
| # fourth -> 30s. All under 600 except the first (cold start, 0 - 0 < 600 | ||
| # so suppressed... see logic). | ||
| mock_time.side_effect = [0, 10, 20, 30, 40, 50] |
|
/prerelease
|
|
Closing in favor of an in-connector throttle in source-s3 — keeps the change scoped to the affected connector rather than the CDK base classes. See airbytehq/airbyte#78421. |
Summary
Why
Legacy file-based connectors (e.g. `source-s3` on the `DefaultFileBasedCursor` path) emit a state message after every slice. The slice is one file, and the state payload carries the full file-history dict — so on streams with thousands of small files the connector emits thousands of growing state messages. Each message is buffered by the platform/orchestrator until the destination ACKs it, which pins memory and OOMs the replication pod on the same failure shape as oncall #7856.
This is a non-concurrent-cursor alternative to the `FileBasedConcurrentCursor` migration in PR #1032. Connectors that don't want to migrate to the concurrent file-based path (e.g. to keep the legacy single-threaded read path's ~360 MB peak RSS profile) can adopt this opt-in instead. Targets oncall #12663.
Test plan
Risk
Low. Single class attribute with default `None`. All existing streams behave identically; the throttle code path is only reached when a subclass explicitly opts in.
Summary by CodeRabbit
New Features
Tests