Skip to content

feat(streams): opt-in time-throttle for legacy per-slice state emission#1036

Closed
Anatolii Yatsuk (tolik0) wants to merge 2 commits into
mainfrom
tolik0/cdk/legacy-stream-state-throttle
Closed

feat(streams): opt-in time-throttle for legacy per-slice state emission#1036
Anatolii Yatsuk (tolik0) wants to merge 2 commits into
mainfrom
tolik0/cdk/legacy-stream-state-throttle

Conversation

@tolik0
Copy link
Copy Markdown
Contributor

@tolik0 Anatolii Yatsuk (tolik0) commented May 26, 2026

Summary

  • Adds `Stream.state_emission_throttle_seconds: Optional[float] = None`.
  • When set, per-slice state emissions inside `Stream.read()` are suppressed if they would fire within the throttle window of the previous emit.
  • A final state message is force-emitted at end-of-stream if any per-slice emit was suppressed, so destinations always see the latest cursor.
  • Default `None` keeps existing behaviour bit-for-bit; this is fully opt-in.

Why

Legacy file-based connectors (e.g. `source-s3` on the `DefaultFileBasedCursor` path) emit a state message after every slice. The slice is one file, and the state payload carries the full file-history dict — so on streams with thousands of small files the connector emits thousands of growing state messages. Each message is buffered by the platform/orchestrator until the destination ACKs it, which pins memory and OOMs the replication pod on the same failure shape as oncall #7856.

This is a non-concurrent-cursor alternative to the `FileBasedConcurrentCursor` migration in PR #1032. Connectors that don't want to migrate to the concurrent file-based path (e.g. to keep the legacy single-threaded read path's ~360 MB peak RSS profile) can adopt this opt-in instead. Targets oncall #12663.

Test plan

  • `unit_tests/sources/streams/` — 765 passed, 1 skipped.
  • New `test_state_emission_throttle_suppresses_per_slice_emit_and_forces_final` — exercises the throttled path (cold-start emit, suppressed mid-stream emits, force-emit at end with the latest cursor).
  • New `test_state_emission_throttle_default_none_keeps_per_slice_emits` — guards the default-off behaviour.

Risk

Low. Single class attribute with default `None`. All existing streams behave identically; the throttle code path is only reached when a subclass explicitly opts in.

Summary by CodeRabbit

  • New Features

    • Added optional state emission throttling to control checkpoint emission frequency during incremental stream operations, with guaranteed final state emission at stream end.
  • Tests

    • Added comprehensive test coverage for state emission throttling behavior.

Review Change Stack

…am attribute

Adds `Stream.state_emission_throttle_seconds: Optional[float] = None`. When
set, the per-slice state message emitted from `Stream.read()` is suppressed
for any slice that completes within `state_emission_throttle_seconds` of
the previous emit. A final state message is force-emitted at end-of-stream
if any throttled emit was suppressed, so the platform/destination always
sees the latest state for the sync.

Default `None` preserves the historical behaviour exactly (emit a state
message after every slice). Streams that produce many slices with growing
state payloads (e.g. file-based sources whose state carries a history dict
that grows by one entry per file synced) can opt in to bound the
platform-side state buffer that otherwise pins memory until the destination
ACKs each message.

Targets the same OOM mode as `ConcurrentPerPartitionCursor`'s state
throttle (oncall #7856), but for the legacy non-concurrent file-based
read path (oncall #12663 alternative).

Unit-tested:
* default (None) keeps one state per slice, no final duplicate.
* throttle=600s with sub-window slices suppresses per-slice emits and
  force-emits a single final state carrying the latest cursor value.
Copilot AI review requested due to automatic review settings May 26, 2026 09:22
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 26, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/26443827536

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/cdk/legacy-stream-state-throttle#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/cdk/legacy-stream-state-throttle

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 26, 2026

Warning

Review limit reached

@tolik0, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 54 minutes and 39 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 64d88bba-a656-4730-8a7d-321c790bcbe8

📥 Commits

Reviewing files that changed from the base of the PR and between 33b5947 and 6e3c5d4.

📒 Files selected for processing (1)
  • airbyte_cdk/sources/file_based/file_based_source.py
📝 Walkthrough

Walkthrough

This PR adds optional state emission throttling to Airbyte CDK streams. When state_emission_throttle_seconds is set, per-slice STATE messages are suppressed if they occur within the throttle window; a final STATE is always emitted at stream end with the latest checkpoint. Default behavior (None) preserves existing per-slice emission.

Changes

State Emission Throttling

Layer / File(s) Summary
Throttle configuration and imports
airbyte_cdk/sources/streams/core.py
Adds import time and the public Stream.state_emission_throttle_seconds: Optional[float] = None configuration field with documentation describing throttled vs. historical per-slice emission behavior.
Throttling implementation and state emission logic
airbyte_cdk/sources/streams/core.py
Introduces internal per-read throttle bookkeeping (last emission timestamp, cached checkpoint snapshot, pending-suppression flag) and reworks per-slice checkpoint-to-STATE emission to conditionally suppress intermediate messages based on elapsed time, with forced final STATE emission at stream end.
Test coverage for throttled and default state emission
unit_tests/sources/streams/test_stream_read.py
Two tests validate behavior: one confirms throttling suppresses per-slice emissions and forces exactly one final STATE with the latest cursor value; another verifies default (None) behavior emits one STATE per slice without redundant final emission.

🎯 3 (Moderate) | ⏱️ ~20 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: adding an optional, time-based throttle mechanism for per-slice state emissions in streams.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch tolik0/cdk/legacy-stream-state-throttle

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
unit_tests/sources/streams/test_stream_read.py (1)

831-833: 💤 Low value

The comment seems a bit contradictory — mind clarifying?

The phrase "All under 600 except the first" suggests the first is NOT under 600, but then "(cold start, 0 - 0 < 600 so suppressed)" says it IS under 600 and IS suppressed.

Perhaps something like: "All deltas are under 600 (including the cold-start where 0 - 0 = 0), so all per-slice emits are suppressed." wdyt?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@unit_tests/sources/streams/test_stream_read.py` around lines 831 - 833, The
comment above mock_time.side_effect is contradictory; update the inline comment
near mock_time.side_effect = [0, 10, 20, 30, 40, 50] to clearly state that all
deltas are under 600 (including the cold-start delta of 0), so all per-slice
emits are suppressed — e.g., replace the existing sentence with a single clear
line like "All deltas are < 600 (cold-start 0 - 0 = 0 included), so per-slice
emits are suppressed." and ensure it references the mock_time side_effect
behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@airbyte_cdk/sources/streams/core.py`:
- Around line 260-295: The failing formatting is in the checkpoint emission
block around the use of
should_checkpoint/throttle_seconds/last_state_emit_at/throttled_pending_emit and
calls to checkpoint_reader.next() and self._checkpoint_state; run the
auto-formatter (poetry run ruff format .) to reflow and fix the indentation/line
breaks in that section so lines 269-276 comply with ruff rules, then verify the
logic around yielding self._checkpoint_state(checkpoint_state,
state_manager=state_manager) and the final conditional that yields
last_observed_checkpoint remains unchanged.

In `@unit_tests/sources/streams/test_stream_read.py`:
- Around line 846-848: The list comprehension assigning state_messages is split
across lines and fails ruff formatting; reformat it to a single line (e.g.,
assign state_messages = [m for m in actual_records if getattr(m, "type", None)
== MessageType.STATE]) or run the project formatter (poetry run ruff format .)
to auto-fix the expression referencing actual_records and MessageType.STATE in
test_stream_read.py.
- Around line 899-901: The list comprehension that builds state_messages (using
actual_records and MessageType.STATE) needs the same formatting fix as the other
test: reformat the comprehension to follow the project's ruff style
(wrap/line-break the brackets and comprehensions consistently). Update the
comprehension formatting for state_messages to match the other corrected case
(keep the expression: state_messages = [m for m in actual_records if getattr(m,
"type", None) == MessageType.STATE]) and run `poetry run ruff format .` to apply
the canonical formatting.

---

Nitpick comments:
In `@unit_tests/sources/streams/test_stream_read.py`:
- Around line 831-833: The comment above mock_time.side_effect is contradictory;
update the inline comment near mock_time.side_effect = [0, 10, 20, 30, 40, 50]
to clearly state that all deltas are under 600 (including the cold-start delta
of 0), so all per-slice emits are suppressed — e.g., replace the existing
sentence with a single clear line like "All deltas are < 600 (cold-start 0 - 0 =
0 included), so per-slice emits are suppressed." and ensure it references the
mock_time side_effect behavior.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: ca0b5bcf-6141-4909-89b2-2f1a42242125

📥 Commits

Reviewing files that changed from the base of the PR and between 57110be and 33b5947.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/streams/core.py
  • unit_tests/sources/streams/test_stream_read.py

Comment on lines 260 to +295
if should_checkpoint and checkpoint_state is not None:
airbyte_state_message = self._checkpoint_state(
checkpoint_state, state_manager=state_manager
)
yield airbyte_state_message
last_observed_checkpoint = checkpoint_state
if throttle_seconds is not None:
now = time.time()
if now - last_state_emit_at < throttle_seconds:
# Suppress this per-slice state emission; the final-
# state emit below will catch up if no other emit
# fires before the stream ends.
throttled_pending_emit = True
else:
last_state_emit_at = now
throttled_pending_emit = False
yield self._checkpoint_state(
checkpoint_state, state_manager=state_manager
)
else:
yield self._checkpoint_state(
checkpoint_state, state_manager=state_manager
)

next_slice = checkpoint_reader.next()

checkpoint = checkpoint_reader.get_checkpoint()
if should_checkpoint and checkpoint is not None:
airbyte_state_message = self._checkpoint_state(checkpoint, state_manager=state_manager)
yield airbyte_state_message
elif (
should_checkpoint
and throttle_seconds is not None
and throttled_pending_emit
and last_observed_checkpoint is not None
):
# Throttling suppressed the final per-slice emit. Force a final
# state message so the platform/destination always sees the
# latest stream state for this sync.
yield self._checkpoint_state(last_observed_checkpoint, state_manager=state_manager)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Pipeline is failing due to formatting — could you run poetry run ruff format . to fix this?

The ruff format check flagged lines 269-276 as needing reformatting. The logic itself looks solid, wdyt?

🧰 Tools
🪛 GitHub Actions: Linters / 0_Ruff Format Check.txt

[error] 269-279: ruff format --diff reported formatting changes needed (code reformatted to a single-line call). Run 'poetry run ruff format .' to apply formatting.

🪛 GitHub Actions: Linters / Ruff Format Check

[error] 269-276: ruff format --diff reported formatting changes were needed. It would reformat 2 files (this file). Run 'poetry run ruff format .' or use 'ruff format --write' to apply formatting.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/streams/core.py` around lines 260 - 295, The failing
formatting is in the checkpoint emission block around the use of
should_checkpoint/throttle_seconds/last_state_emit_at/throttled_pending_emit and
calls to checkpoint_reader.next() and self._checkpoint_state; run the
auto-formatter (poetry run ruff format .) to reflow and fix the indentation/line
breaks in that section so lines 269-276 comply with ruff rules, then verify the
logic around yielding self._checkpoint_state(checkpoint_state,
state_manager=state_manager) and the final conditional that yields
last_observed_checkpoint remains unchanged.

Comment on lines +846 to +848
state_messages = [
m for m in actual_records if getattr(m, "type", None) == MessageType.STATE
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Pipeline failing on formatting — ruff wants this list comprehension on a single line.

Could you run poetry run ruff format . to auto-fix this?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@unit_tests/sources/streams/test_stream_read.py` around lines 846 - 848, The
list comprehension assigning state_messages is split across lines and fails ruff
formatting; reformat it to a single line (e.g., assign state_messages = [m for m
in actual_records if getattr(m, "type", None) == MessageType.STATE]) or run the
project formatter (poetry run ruff format .) to auto-fix the expression
referencing actual_records and MessageType.STATE in test_stream_read.py.

Comment on lines +899 to +901
state_messages = [
m for m in actual_records if getattr(m, "type", None) == MessageType.STATE
]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Same formatting fix needed here for the list comprehension.

Running poetry run ruff format . will fix both list comprehensions in these new tests.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@unit_tests/sources/streams/test_stream_read.py` around lines 899 - 901, The
list comprehension that builds state_messages (using actual_records and
MessageType.STATE) needs the same formatting fix as the other test: reformat the
comprehension to follow the project's ruff style (wrap/line-break the brackets
and comprehensions consistently). Update the comprehension formatting for
state_messages to match the other corrected case (keep the expression:
state_messages = [m for m in actual_records if getattr(m, "type", None) ==
MessageType.STATE]) and run `poetry run ruff format .` to apply the canonical
formatting.

…rce to streams

Adds `FileBasedSource._stream_state_emission_throttle_seconds: Optional[float] = None`
that, when set, is applied to every DefaultFileBasedStream the source
creates in `_make_default_stream`. Connectors only need to set the
attribute on their FileBasedSource subclass — no per-stream subclassing
required.

Pairs with the `Stream.state_emission_throttle_seconds` opt-in added in
the previous commit. Default behaviour unchanged when the attribute is
left at its None default.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an opt-in, time-based throttle for per-slice state emission in Stream.read() to reduce excessive state message volume (and associated orchestrator buffering/memory pressure) for legacy slice-heavy streams, while preserving default behavior when unset.

Changes:

  • Introduces Stream.state_emission_throttle_seconds: Optional[float] = None and throttling logic for per-slice checkpoint emits.
  • Adds “force final state” emission when throttling suppressed the last per-slice emit and the checkpoint reader would otherwise return None at end-of-stream.
  • Adds unit tests covering throttled vs default behavior.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.

File Description
airbyte_cdk/sources/streams/core.py Implements opt-in per-slice state emission throttling in Stream.read().
unit_tests/sources/streams/test_stream_read.py Adds tests for throttled state emission and default behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +200 to +203
throttle_seconds = self.state_emission_throttle_seconds
last_state_emit_at: float = 0.0
last_observed_checkpoint: Optional[Mapping[str, Any]] = None
throttled_pending_emit = False
Comment on lines +849 to +854
# Cold-start emit is suppressed (delta 0 < 600); subsequent ones suppressed;
# final-state force emit fires once at end of stream.
assert len(state_messages) == 1
# Final state must carry the latest observed cursor.
final = state_messages[-1]
assert final.state.stream.stream_state.created_at == timestamp
Comment on lines +826 to +833
# 600 s throttle. We mock time.time so the first emit fires, the next two
# fall inside the throttle window, and the loop ends needing a final emit.
stream.state_emission_throttle_seconds = 600
mock_time = mocker.patch("airbyte_cdk.sources.streams.core.time.time")
# First call inside throttle check -> 0s; second -> 10s; third -> 20s;
# fourth -> 30s. All under 600 except the first (cold start, 0 - 0 < 600
# so suppressed... see logic).
mock_time.side_effect = [0, 10, 20, 30, 40, 50]
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 26, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/26444084111

@github-actions
Copy link
Copy Markdown

PyTest Results (Fast)

4 073 tests  +2   4 062 ✅ +2   7m 13s ⏱️ -2s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 6e3c5d4. ± Comparison against base commit 57110be.

@github-actions
Copy link
Copy Markdown

PyTest Results (Full)

4 076 tests  +2   4 064 ✅ +2   11m 34s ⏱️ +7s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 6e3c5d4. ± Comparison against base commit 57110be.

@tolik0
Copy link
Copy Markdown
Contributor Author

Closing in favor of an in-connector throttle in source-s3 — keeps the change scoped to the affected connector rather than the CDK base classes. See airbytehq/airbyte#78421.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants