What would you like?
max_concurrency in map/parallel doesn't throttle async operations like invoke
Problem Statement
When using context.map() or context.parallel() with max_concurrency set, you'd expect it to limit the number of concurrent operations in progress. However, the current implementation only limits the number of active threads, not the number of in-flight async operations.
This means when map/parallel iterations contain async operations like context.invoke(), context.wait(), or context.wait_for_condition() - all items can be started nearly simultaneously regardless of the max_concurrency setting. While this does optimize processing, it does seem surprising given the max_concurrency semantic.
Current Behavior
How it works today
- All tasks are submitted to a
ThreadPoolExecutor immediately
- The thread pool has
max_workers = max_concurrency threads
- When a task calls an async operation like
context.invoke():
- It creates a checkpoint
- Raises
SuspendExecution
- The thread completes and returns to the pool
- The next queued task starts immediately
- Result: All async operations are initiated as fast as threads can cycle through items (typically milliseconds per item)
Example
# User expects: Only 5 Lambda invocations in-flight at once
# Actual behavior: All 100 Lambda invocations start within ~1 second
result = context.map(
items=range(100),
func=lambda ctx, item, idx, items: ctx.invoke("MyFunction", item),
config=MapConfig(max_concurrency=5)
)
With max_concurrency=5 and 100 items:
- Tasks 0-4 start in 5 threads
- Each calls
invoke() and suspends (releases thread)
- Tasks 5-9 start immediately in the freed threads
- This continues rapidly until all 100 invocations are started
- All 100 Lambda functions are invoked concurrently
Expected Behavior
max_concurrency should limit the number of in-flight operations (including suspended ones), not just active threads.
Desired behavior
- Start
max_concurrency tasks initially
- When a task completes (reaches SUCCEEDED or FAILED state), start the next pending task
- Suspended tasks remain "in-flight" and count against the concurrency limit
- If all in-flight tasks are suspended, the parent suspends too
Example with expected behavior
result = context.map(
items=range(100),
func=lambda ctx, item, idx, items: ctx.invoke("MyFunction", item),
config=MapConfig(max_concurrency=5)
)
With max_concurrency=5 and 100 items:
- Tasks 0-4 start and invoke Lambda functions
- Parent suspends waiting for responses
- When Lambda 0 completes, task 5 starts
- When Lambda 1 completes, task 6 starts
- At most 5 Lambda invocations are in-flight at any time
Impact
Current workarounds
Manually pre-batch their arrays:
# Manual batching to achieve max 5 concurrent invocations
def batch_items(items, batch_size):
return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
batches = batch_items(items, max_concurrency=5)
for batch in batches:
for i, item in enumerate(batch):
context.invoke(f"batch_{i}", "MyFunction", item)
This is not great.
Related Discussion
See Discussion #278 for the initial report.
Proposed Solution
Implement dynamic task submission where:
- Track "in-flight" tasks (PENDING + RUNNING + SUSPENDED states)
- Only submit new tasks when
in_flight_count < max_concurrency
- Decrement
in_flight_count only when tasks reach terminal states (COMPLETED or FAILED)
- Suspended tasks remain in-flight and block new submissions
Acceptance Criteria
Additional Context
Code locations
src/aws_durable_execution_sdk_python/concurrency/executor.py: ConcurrentExecutor.execute()
src/aws_durable_execution_sdk_python/operation/map.py: Map operation
src/aws_durable_execution_sdk_python/operation/parallel.py: Parallel operation
Key insight
The issue stems from submitting all tasks upfront:
futures = [submit_task(exe_state) for exe_state in self.executables_with_state]
Replace this with dynamic submission based on in-flight count.
Possible Implementation
No response
Is this a breaking change?
No
Does this require an RFC?
No
Additional Context
No response
What would you like?
max_concurrencyin map/parallel doesn't throttle async operations likeinvokeProblem Statement
When using
context.map()orcontext.parallel()withmax_concurrencyset, you'd expect it to limit the number of concurrent operations in progress. However, the current implementation only limits the number of active threads, not the number of in-flight async operations.This means when map/parallel iterations contain async operations like
context.invoke(),context.wait(), orcontext.wait_for_condition()- all items can be started nearly simultaneously regardless of themax_concurrencysetting. While this does optimize processing, it does seem surprising given themax_concurrencysemantic.Current Behavior
How it works today
ThreadPoolExecutorimmediatelymax_workers = max_concurrencythreadscontext.invoke():SuspendExecutionExample
With
max_concurrency=5and 100 items:invoke()and suspends (releases thread)Expected Behavior
max_concurrencyshould limit the number of in-flight operations (including suspended ones), not just active threads.Desired behavior
max_concurrencytasks initiallyExample with expected behavior
With
max_concurrency=5and 100 items:Impact
Current workarounds
Manually pre-batch their arrays:
This is not great.
Related Discussion
See Discussion #278 for the initial report.
Proposed Solution
Implement dynamic task submission where:
in_flight_count < max_concurrencyin_flight_countonly when tasks reach terminal states (COMPLETED or FAILED)Acceptance Criteria
max_concurrencylimits in-flight operations, not just active threadsAdditional Context
Code locations
src/aws_durable_execution_sdk_python/concurrency/executor.py:ConcurrentExecutor.execute()src/aws_durable_execution_sdk_python/operation/map.py: Map operationsrc/aws_durable_execution_sdk_python/operation/parallel.py: Parallel operationKey insight
The issue stems from submitting all tasks upfront:
Replace this with dynamic submission based on in-flight count.
Possible Implementation
No response
Is this a breaking change?
No
Does this require an RFC?
No
Additional Context
No response