fix(file-based): respect _concurrency_level for thread pool sizing#1035
Draft
devin-ai-integration[bot] wants to merge 2 commits into
Draft
fix(file-based): respect _concurrency_level for thread pool sizing#1035devin-ai-integration[bot] wants to merge 2 commits into
devin-ai-integration[bot] wants to merge 2 commits into
Conversation
FileBasedSource.__init__ always created the ConcurrentSource with MAX_CONCURRENCY (100) workers regardless of the subclass's _concurrency_level setting. This meant connectors that set _concurrency_level to a lower value (e.g. 20) still got 100 concurrent file readers, causing OOM on large S3 streams within 2 Gi container limits. Use _concurrency_level (capped at MAX_CONCURRENCY) to size the thread pool and initial partition count. When _concurrency_level is None (default), the existing MAX_CONCURRENCY is used. Co-Authored-By: bot_apk <apk@cognition.ai>
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1779723955-fix-file-based-concurrency-level#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1779723955-fix-file-based-concurrency-levelPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: bot_apk <apk@cognition.ai>
1 task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
FileBasedSource.__init__always created theConcurrentSourcewithMAX_CONCURRENCY(100) workers regardless of the subclass's_concurrency_levelsetting. This meant connectors that set_concurrency_levelto a lower value (e.g. 20) still got 100 concurrent file readers, causing OOM on large S3 streams within 2 Gi container limits.This fix uses
_concurrency_level(capped atMAX_CONCURRENCY) to size the thread pool and initial partition count. When_concurrency_levelisNone(default), the existingMAX_CONCURRENCYbehavior is preserved.Root cause: The concurrent file-based cursor migration (airbytehq/airbyte#78325) for source-s3 triggered OOM failures on large streams because the thread pool always ran 100 concurrent file readers regardless of the connector's declared
_concurrency_level. See airbytehq/oncall#12714 for the incident.Related to https://github.com/airbytehq/oncall/issues/12714:
Review & Testing Checklist for Human
_concurrency_levelis correctly used to size the thread pool (checkConcurrentSource.createreceives the expectednum_workersandinitial_number_of_partitions_to_generate)_concurrency_level = None(the default) still getMAX_CONCURRENCYworkers — no regression for existing connectors_concurrency_level = 20against a large S3 stream to verify memory stays under 2 GiNotes
The companion change in source-s3 (setting
_concurrency_level = 20) will be submitted as a separate PR in the monorepo once this CDK change is available.Link to Devin session: https://app.devin.ai/sessions/ff881275041b469f9a6aed60a6af0fe2