Skip to content

fix: lazy-evaluate getWorldHandlers() in step-handler#1486

Closed
gmathieu wants to merge 2 commits intovercel:mainfrom
gmathieu:fix/lazy-evaluate-get-world-handlers
Closed

fix: lazy-evaluate getWorldHandlers() in step-handler#1486
gmathieu wants to merge 2 commits intovercel:mainfrom
gmathieu:fix/lazy-evaluate-get-world-handlers

Conversation

@gmathieu
Copy link
Copy Markdown

Lazy-evaluate getWorldHandlers() in step-handler

Fixes #825

Problem

getWorldHandlers() is called at module load time in step-handler.ts. When deploying to AWS Lambda via OpenNext, this runs before instrumentation.ts's register() can call setWorld(), causing createWorld() to fall through to a dynamic require(WORKFLOW_TARGET_WORLD) that bundlers can't trace — resulting in Cannot find module '@workflow/world-postgres'.

Fix

Defer the getWorldHandlers().createQueueHandler(...) call from module scope to first invocation, so setWorld() has a chance to run first via register().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@changeset-bot
Copy link
Copy Markdown

changeset-bot Bot commented Mar 23, 2026

🦋 Changeset detected

Latest commit: 29551d8

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 16 packages
Name Type
@workflow/core Patch
@workflow/builders Patch
@workflow/cli Patch
@workflow/next Patch
@workflow/nitro Patch
@workflow/vitest Patch
@workflow/web-shared Patch
workflow Patch
@workflow/world-testing Patch
@workflow/astro Patch
@workflow/nest Patch
@workflow/rollup Patch
@workflow/sveltekit Patch
@workflow/vite Patch
@workflow/nuxt Patch
@workflow/ai Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@vercel
Copy link
Copy Markdown
Contributor

vercel Bot commented Mar 23, 2026

@gmathieu is attempting to deploy a commit to the Vercel Labs Team on Vercel.

A member of the Team first needs to authorize it.

Signed-off-by: Guillaume Mathieu <gmathieu@users.noreply.github.com>
@gmathieu gmathieu marked this pull request as ready for review March 23, 2026 18:33
@gmathieu gmathieu requested a review from a team as a code owner March 23, 2026 18:33
Comment on lines -48 to +50
const stepHandler = getWorldHandlers().createQueueHandler(
'__wkf_step_',
async (message_, metadata) => {
// Check if this is a health check message
// NOTE: Health check messages are intentionally unauthenticated for monitoring purposes.
// They only write a simple status response to a stream and do not expose sensitive data.
// The stream name includes a unique correlationId that must be known by the caller.
const healthCheck = parseHealthCheckPayload(message_);
if (healthCheck) {
await handleHealthCheckMessage(healthCheck, 'step');
return;
}

const {
workflowName,
workflowRunId,
workflowStartedAt,
stepId,
traceCarrier: traceContext,
requestedAt,
} = StepInvokePayloadSchema.parse(message_);
const { requestId } = metadata;
const spanLinks = await linkToCurrentContext();
// Execute step within the propagated trace context
return await withTraceContext(traceContext, async () => {
// Extract the step name from the topic name
const stepName = metadata.queueName.slice('__wkf_step_'.length);
const world = getWorld();
const isVercel = process.env.VERCEL_URL !== undefined;

// Resolve local async values concurrently before entering the trace span
const [port, spanKind] = await Promise.all([
isVercel ? undefined : getPort(),
getSpanKind('CONSUMER'),
]);

return trace(
`STEP ${stepName}`,
{ kind: spanKind, links: spanLinks },
async (span) => {
span?.setAttributes({
...Attribute.StepName(stepName),
...Attribute.StepAttempt(metadata.attempt),
// Standard OTEL messaging conventions
...Attribute.MessagingSystem('vercel-queue'),
...Attribute.MessagingDestinationName(metadata.queueName),
...Attribute.MessagingMessageId(metadata.messageId),
...Attribute.MessagingOperationType('process'),
...getQueueOverhead({ requestedAt }),
});

const stepFn = getStepFunction(stepName);
if (!stepFn) {
throw new Error(`Step "${stepName}" not found`);
}
if (typeof stepFn !== 'function') {
throw new Error(
`Step "${stepName}" is not a function (got ${typeof stepFn})`
);
}

const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES;

span?.setAttributes({
...Attribute.WorkflowName(workflowName),
...Attribute.WorkflowRunId(workflowRunId),
...Attribute.StepId(stepId),
...Attribute.StepMaxRetries(maxRetries),
...Attribute.StepTracePropagated(!!traceContext),
});

// step_started validates state and returns the step entity, so no separate
// world.steps.get() call is needed. The server checks:
// - Step not in terminal state (returns 409)
// - retryAfter timestamp reached (returns 425 with Retry-After header)
// - Workflow still active (returns 410 if completed)
let step;
try {
const startResult = await world.events.create(
workflowRunId,
{
eventType: 'step_started',
specVersion: SPEC_VERSION_CURRENT,
correlationId: stepId,
},
{ requestId }
);

if (!startResult.step) {
throw new WorkflowRuntimeError(
`step_started event for "${stepId}" did not return step entity`
);
}
step = startResult.step;
} catch (err) {
if (ThrottleError.is(err)) {
const retryRetryAfter = Math.max(
1,
typeof err.retryAfter === 'number' ? err.retryAfter : 1
);
runtimeLogger.info(
'Throttled again on retry, deferring to queue',
{
retryAfterSeconds: retryRetryAfter,
}
);
return { timeoutSeconds: retryRetryAfter };
}
if (RunExpiredError.is(err)) {
runtimeLogger.info(
`Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}`
);
return;
}
if (EntityConflictError.is(err)) {
runtimeLogger.debug(
'Step in terminal state, re-enqueuing workflow',
{
stepName,
stepId,
workflowRunId,
error: err.message,
}
);
span?.setAttributes({
...Attribute.StepSkipped(true),
...Attribute.StepSkipReason('completed'),
});
span?.addEvent?.('step.skipped', {
'skip.reason': 'terminal_state',
'step.name': stepName,
'step.id': stepId,
});
await queueMessage(world, getWorkflowQueueName(workflowName), {
runId: workflowRunId,
traceCarrier: await serializeTraceCarrier(),
requestedAt: new Date(),
});
return;
}
let stepHandler: ((req: Request) => Promise<Response>) | undefined;
function getStepHandler(): (req: Request) => Promise<Response> {
if (!stepHandler) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highlight what changed; everything else is formatting.

Comment on lines 769 to +771
}
);
return stepHandler;
}
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

Comment on lines -748 to +779
/* @__PURE__ */ withHealthCheck(stepHandler);
withHealthCheck((req: Request) => getStepHandler()(req));
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@pranaygp
Copy link
Copy Markdown
Contributor

Talking about this internally right now. Without digging deeply into the code, my quick note is that we'd have to do the same for flow handlers, not just step handlers if we do this, right?

@gmathieu
Copy link
Copy Markdown
Author

@pranaygp, sorry, I'm not familiar with flow handlers. I only see two instances of getWorldHandlers() so I don't think the rest of the system is impacted.

The first getWorldHandlers() lives inside a function, which is good (no side effects):

export function workflowEntrypoint(
workflowCode: string
): (req: Request) => Promise<Response> {
const handler = getWorldHandlers().createQueueHandler(

step-handler.ts calls getWorldHandlers() as soon as anyone imports @workflow/core/runtime or @workflow/runtime, which creates side effects:

const stepHandler = getWorldHandlers().createQueueHandler(

I added more details to the issue. Hope this helps.

@gmathieu
Copy link
Copy Markdown
Author

gmathieu commented Apr 9, 2026

Fixed by #942

@gmathieu gmathieu closed this Apr 9, 2026
@gmathieu gmathieu deleted the fix/lazy-evaluate-get-world-handlers branch April 9, 2026 21:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

getWorldHandlers() called at module load time breaks OpenNext/Lambda deployments with custom World

2 participants