Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ Render Workflows support both Python and TypeScript. This repo contains Python e

| Example | Use Case | Key Patterns | Extra Dependencies |
|---------|----------|--------------|-------------------|
| [**Hello World**](./hello-world/) | Learn workflow basics with simple number processing | Task definition, subtask calling with `await`, basic orchestration | None |
| [**ETL Job**](./etl-job/) | Process CSV data with validation and statistics | Subtasks, sequential processing, batch operations, data validation | None |
| [**OpenAI Agent**](./openai-agent/) | AI customer support agent with tool calling | Tool calling, nested subtasks (3 levels deep), stateful workflows, dynamic orchestration | `openai` |
| [**Hello World**](./hello-world/) | Learn workflow basics with simple number processing | Task definition, task chaining with `await`, basic orchestration | None |
| [**ETL Job**](./etl-job/) | Process CSV data with validation and statistics | Task chaining, sequential processing, batch operations, data validation | None |
| [**OpenAI Agent**](./openai-agent/) | AI customer support agent with tool calling | Tool calling, nested task chains (3 levels deep), stateful workflows, dynamic orchestration | `openai` |
| [**File Processing**](./file-processing/) | Batch process multiple file formats in parallel | Parallel execution with `asyncio.gather()`, multi-format handling, aggregation | None |
| [**Data Pipeline**](./data-pipeline/) | Multi-source customer analytics pipeline | Parallel extraction, data enrichment, combining parallel + sequential patterns | `httpx` |
| [**File Analyzer**](./file-analyzer/) | API service calling workflow tasks for file analysis | Client SDK + Task SDK, workflow slugs, service separation, FastAPI integration | `fastapi`, `uvicorn` |
Expand All @@ -28,8 +28,8 @@ Render Workflows support both Python and TypeScript. This repo contains Python e
The simplest possible workflow — learn the fundamentals through simple number processing.

- Ultra-simple task definitions
- Clear subtask calling examples
- Subtasks in loops demonstration
- Clear task-chaining examples
- Task chaining in loops demonstration
- Multi-step workflow orchestration
- Heavily commented code explaining every pattern

Expand All @@ -43,7 +43,7 @@ Complete Extract, Transform, Load pipeline — process customer data from CSV fi

- CSV data extraction with retry logic
- Record validation and error tracking
- Batch processing with subtasks
- Batch processing with task chaining
- Statistical aggregation
- Comprehensive error handling

Expand All @@ -56,7 +56,7 @@ Intelligent conversational agent — a customer support bot that can answer ques
- Multi-turn conversations with context
- Dynamic tool/function calling
- Stateful workflow management
- Integration with OpenAI GPT-4
- Integration with OpenAI models
- Extensible tool framework

[View OpenAI Agent Example →](./openai-agent/)
Expand Down
36 changes: 17 additions & 19 deletions data-pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,9 @@ task_run = await render.workflows.run_task(
"data-pipeline-workflows/run_data_pipeline",
{"user_ids": ["user_1", "user_2", "user_3", "user_4"]}
)

result = await task_run
print(f"Pipeline status: {result.results['status']}")
print(f"Total revenue: ${result.results['insights']['revenue']['total']}")
print(f"Segment distribution: {result.results['segment_distribution']}")
print(f"Pipeline status: {task_run.results['status']}")
print(f"Total revenue: ${task_run.results['insights']['revenue']['total']}")
print(f"Segment distribution: {task_run.results['insights']['segment_distribution']}")
```

## Pipeline Stages
Expand All @@ -212,18 +210,18 @@ Using `asyncio.gather()` ensures all sources are fetched in parallel for maximum

### Stage 2: Transform

**`transform_user_data`**: Combines data from all sources and enriches each user by calling subtasks:
**`transform_user_data`**: Combines data from all sources and enriches each user by chaining task runs:
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The vast majority of edits in this PR are reframing "subtasks" as "chaining tasks/runs" per docs

```python
for user in users:
# SUBTASK CALL: Calculate metrics for this user
# TASK CHAINING: Calculate metrics for this user
user_metrics = await calculate_user_metrics(user, transactions, engagement)

# SUBTASK CALL: Enrich with geographic data
# TASK CHAINING: Enrich with geographic data
geo_data = await enrich_with_geo_data(user['email'])

enriched_users.append({**user_metrics, 'geo': geo_data})
```
This demonstrates **sequential subtask calls per item** in a transformation loop.
This demonstrates **sequential task chaining per item** in a transformation loop.

**`calculate_user_metrics`**: Calculates per-user metrics:
- Total spent and refunded
Expand Down Expand Up @@ -279,40 +277,40 @@ This demonstrates **sequential subtask calls per item** in a transformation loop

## Key Patterns Demonstrated

### Parallel Extraction with Subtasks
### Parallel Extraction with Task Chaining

```python
# SUBTASK PATTERN: Launch multiple subtasks in parallel
# TASK CHAINING PATTERN: Launch multiple task runs in parallel
user_task = fetch_user_data(user_ids)
transaction_task = fetch_transaction_data(user_ids)
engagement_task = fetch_engagement_data(user_ids)

# SUBTASK CALLS: Wait for all three subtasks to complete
# CHAINED RUNS: Wait for all three task runs to complete
user_data, transaction_data, engagement_data = await asyncio.gather(
user_task,
transaction_task,
engagement_task
)
```

This demonstrates **parallel subtask execution** - all three data sources are fetched simultaneously.
This demonstrates **parallel task execution** - all three data sources are fetched simultaneously.
This reduces total extraction time from sum(A+B+C) to max(A,B,C).

### Data Enrichment with Subtasks
### Data Enrichment with Task Chaining

Each user is enriched by calling multiple subtasks:
Each user is enriched by chaining multiple task runs:
```python
for user in users:
# SUBTASK CALL: Calculate user-specific metrics
# TASK CHAINING: Calculate user-specific metrics
metrics = await calculate_user_metrics(user, transactions, engagement)

# SUBTASK CALL: Enrich with geographic data
# TASK CHAINING: Enrich with geographic data
geo = await enrich_with_geo_data(user['email'])

enriched_users.append({**metrics, 'geo': geo})
```

This shows **sequential subtask calls** for per-item enrichment.
This shows **sequential task chaining** for per-item enrichment.

### User Segmentation

Expand Down Expand Up @@ -375,7 +373,7 @@ async def send_pipeline_notification(result: dict) -> dict:

## Important Notes

- **Python-only**: Workflows are only supported in Python via render-sdk
- **SDK languages**: Workflows support Python and TypeScript; this repo's examples are Python.
- **No Blueprint Support**: Workflows don't support render.yaml blueprint configuration
- **Mock Data**: Example uses simulated data; replace with real API calls in production
- **Idempotency**: Design pipeline to be safely re-runnable
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
render-sdk>=0.5.0
render-sdk>=0.6.0
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping all the min sdk versions to 0.6.0 while here

httpx>=0.27.0
2 changes: 1 addition & 1 deletion data-pipeline/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ renderStartCommand: python main.py
nextSteps:
- label: Enter your project directory
command: cd {{dir}}
- label: Start your local task server
- label: Start your local workflow service
command: render workflows dev -- {{startCommand}}
hint: This runs your workflow service locally, allowing you to view and run tasks without deploying to Render.
- label: Run the pipeline locally (in another terminal)
Expand Down
21 changes: 9 additions & 12 deletions etl-job/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Process customer signup data from CSV files with validation, cleaning, and stati

## Features

- **Subtask Execution**: Demonstrates calling tasks from other tasks using `await`
- **Task Chaining**: Demonstrates calling tasks from other tasks using `await`
- **Extract**: Read data from CSV files (extensible to APIs, databases)
- **Transform**: Validate records with comprehensive error tracking
- **Load**: Compute statistics and prepare aggregated insights
Expand Down Expand Up @@ -136,11 +136,8 @@ task_run = await render.workflows.run_task(
"etl-job-workflows/run_etl_pipeline",
{"source_file": "sample_data.csv"}
)

# Wait for completion
result = await task_run
print(f"Pipeline status: {result.results['status']}")
print(f"Valid records: {result.results['transform']['valid_count']}")
print(f"Pipeline status: {task_run.results['status']}")
print(f"Valid records: {task_run.results['transform']['valid_count']}")
```

## Sample Data
Expand All @@ -163,25 +160,25 @@ This demonstrates how the pipeline handles data quality issues.
- Validates age range (0-120)
- Returns cleaned data with error tracking

**`transform_batch`**: Processes all records by calling `validate_record` as a subtask for each one:
**`transform_batch`**: Processes all records by chaining `validate_record` for each one:
```python
for record in records:
# Call validate_record as a subtask
# Chain validate_record for each record
validated = await validate_record(record)
```
This demonstrates **calling subtasks in a loop** for batch processing.
This demonstrates **task chaining in a loop** for batch processing.

**`compute_statistics`**: Aggregates valid records to produce:
- Country distribution
- Age statistics (min, max, average)
- Data quality metrics

**`run_etl_pipeline`**: Main orchestrator that calls three subtasks sequentially:
**`run_etl_pipeline`**: Main orchestrator that chains three task runs sequentially:
1. `await extract_csv_data(source_file)` - Extract data
2. `await transform_batch(raw_records)` - Validate records (which calls `validate_record` for each)
3. `await compute_statistics(valid_records)` - Generate insights

This demonstrates **sequential subtask orchestration** for multi-stage pipelines.
This demonstrates **sequential task chaining** for multi-stage pipelines.

## Extending This Example

Expand Down Expand Up @@ -220,6 +217,6 @@ async def transform_batch_parallel(records: list[dict]) -> dict:

## Important Notes

- **Python-only**: Workflows are only supported in Python via render-sdk
- **SDK languages**: Workflows support Python and TypeScript; this repo's examples are Python.
- **No Blueprint Support**: Workflows don't support render.yaml blueprint configuration
- **Service Type**: Deploy as a Workflow service on Render (not Background Worker or Web Service)
16 changes: 8 additions & 8 deletions etl-job/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def transform_batch(records: list[dict]) -> dict:
"""
Transform a batch of records by validating each one.

This demonstrates subtask execution in a loop, processing multiple
This demonstrates task chaining in a loop, processing multiple
records individually while maintaining error tracking.

Args:
Expand All @@ -161,11 +161,11 @@ async def transform_batch(records: list[dict]) -> dict:
valid_records = []
invalid_records = []

# Process each record through validation subtask
# KEY PATTERN: Calling subtasks in a loop
# Process each record by chaining validate_record runs
# KEY PATTERN: Task chaining in a loop
for i, record in enumerate(records, 1):
logger.info(f"[TRANSFORM] Processing record {i}/{len(records)}")
# SUBTASK CALL: Each record is validated by calling validate_record as a subtask
# TASK CHAINING: Each record is validated by chaining validate_record
validated = await validate_record(record)

if validated['is_valid']:
Expand Down Expand Up @@ -264,7 +264,7 @@ async def run_etl_pipeline(source_file: str) -> dict:
2. Transform: Validate and clean records
3. Load: Compute statistics and prepare for storage

This demonstrates a full workflow with multiple subtask calls and
This demonstrates a full workflow with multiple chained task runs and
comprehensive error handling.

Args:
Expand All @@ -281,20 +281,20 @@ async def run_etl_pipeline(source_file: str) -> dict:
try:
# Stage 1: Extract
logger.info("[PIPELINE] Stage 1/3: EXTRACT")
# SUBTASK CALL: Extract data from CSV
# TASK CHAINING: Extract data from CSV
raw_records = await extract_csv_data(source_file)
logger.info(f"[PIPELINE] Extracted {len(raw_records)} records")

# Stage 2: Transform
logger.info("[PIPELINE] Stage 2/3: TRANSFORM")
# SUBTASK CALL: Transform calls validate_record for each record
# TASK CHAINING: Transform chains validate_record for each record
transform_result = await transform_batch(raw_records)
logger.info(f"[PIPELINE] Transformation complete: "
f"{transform_result['success_rate']:.1%} success rate")

# Stage 3: Load (compute statistics)
logger.info("[PIPELINE] Stage 3/3: LOAD")
# SUBTASK CALL: Compute final statistics
# TASK CHAINING: Compute final statistics
statistics = await compute_statistics(transform_result['valid_records'])
logger.info("[PIPELINE] Statistics computed")

Expand Down
2 changes: 1 addition & 1 deletion etl-job/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
render-sdk>=0.5.0
render-sdk>=0.6.0
Loading