fix: lazy-evaluate getWorldHandlers() in step-handler#1486
fix: lazy-evaluate getWorldHandlers() in step-handler#1486gmathieu wants to merge 2 commits intovercel:mainfrom
getWorldHandlers() in step-handler#1486Conversation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
🦋 Changeset detectedLatest commit: 29551d8 The changes in this PR will be included in the next version bump. This PR includes changesets to release 16 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
@gmathieu is attempting to deploy a commit to the Vercel Labs Team on Vercel. A member of the Team first needs to authorize it. |
Signed-off-by: Guillaume Mathieu <gmathieu@users.noreply.github.com>
| const stepHandler = getWorldHandlers().createQueueHandler( | ||
| '__wkf_step_', | ||
| async (message_, metadata) => { | ||
| // Check if this is a health check message | ||
| // NOTE: Health check messages are intentionally unauthenticated for monitoring purposes. | ||
| // They only write a simple status response to a stream and do not expose sensitive data. | ||
| // The stream name includes a unique correlationId that must be known by the caller. | ||
| const healthCheck = parseHealthCheckPayload(message_); | ||
| if (healthCheck) { | ||
| await handleHealthCheckMessage(healthCheck, 'step'); | ||
| return; | ||
| } | ||
|
|
||
| const { | ||
| workflowName, | ||
| workflowRunId, | ||
| workflowStartedAt, | ||
| stepId, | ||
| traceCarrier: traceContext, | ||
| requestedAt, | ||
| } = StepInvokePayloadSchema.parse(message_); | ||
| const { requestId } = metadata; | ||
| const spanLinks = await linkToCurrentContext(); | ||
| // Execute step within the propagated trace context | ||
| return await withTraceContext(traceContext, async () => { | ||
| // Extract the step name from the topic name | ||
| const stepName = metadata.queueName.slice('__wkf_step_'.length); | ||
| const world = getWorld(); | ||
| const isVercel = process.env.VERCEL_URL !== undefined; | ||
|
|
||
| // Resolve local async values concurrently before entering the trace span | ||
| const [port, spanKind] = await Promise.all([ | ||
| isVercel ? undefined : getPort(), | ||
| getSpanKind('CONSUMER'), | ||
| ]); | ||
|
|
||
| return trace( | ||
| `STEP ${stepName}`, | ||
| { kind: spanKind, links: spanLinks }, | ||
| async (span) => { | ||
| span?.setAttributes({ | ||
| ...Attribute.StepName(stepName), | ||
| ...Attribute.StepAttempt(metadata.attempt), | ||
| // Standard OTEL messaging conventions | ||
| ...Attribute.MessagingSystem('vercel-queue'), | ||
| ...Attribute.MessagingDestinationName(metadata.queueName), | ||
| ...Attribute.MessagingMessageId(metadata.messageId), | ||
| ...Attribute.MessagingOperationType('process'), | ||
| ...getQueueOverhead({ requestedAt }), | ||
| }); | ||
|
|
||
| const stepFn = getStepFunction(stepName); | ||
| if (!stepFn) { | ||
| throw new Error(`Step "${stepName}" not found`); | ||
| } | ||
| if (typeof stepFn !== 'function') { | ||
| throw new Error( | ||
| `Step "${stepName}" is not a function (got ${typeof stepFn})` | ||
| ); | ||
| } | ||
|
|
||
| const maxRetries = stepFn.maxRetries ?? DEFAULT_STEP_MAX_RETRIES; | ||
|
|
||
| span?.setAttributes({ | ||
| ...Attribute.WorkflowName(workflowName), | ||
| ...Attribute.WorkflowRunId(workflowRunId), | ||
| ...Attribute.StepId(stepId), | ||
| ...Attribute.StepMaxRetries(maxRetries), | ||
| ...Attribute.StepTracePropagated(!!traceContext), | ||
| }); | ||
|
|
||
| // step_started validates state and returns the step entity, so no separate | ||
| // world.steps.get() call is needed. The server checks: | ||
| // - Step not in terminal state (returns 409) | ||
| // - retryAfter timestamp reached (returns 425 with Retry-After header) | ||
| // - Workflow still active (returns 410 if completed) | ||
| let step; | ||
| try { | ||
| const startResult = await world.events.create( | ||
| workflowRunId, | ||
| { | ||
| eventType: 'step_started', | ||
| specVersion: SPEC_VERSION_CURRENT, | ||
| correlationId: stepId, | ||
| }, | ||
| { requestId } | ||
| ); | ||
|
|
||
| if (!startResult.step) { | ||
| throw new WorkflowRuntimeError( | ||
| `step_started event for "${stepId}" did not return step entity` | ||
| ); | ||
| } | ||
| step = startResult.step; | ||
| } catch (err) { | ||
| if (ThrottleError.is(err)) { | ||
| const retryRetryAfter = Math.max( | ||
| 1, | ||
| typeof err.retryAfter === 'number' ? err.retryAfter : 1 | ||
| ); | ||
| runtimeLogger.info( | ||
| 'Throttled again on retry, deferring to queue', | ||
| { | ||
| retryAfterSeconds: retryRetryAfter, | ||
| } | ||
| ); | ||
| return { timeoutSeconds: retryRetryAfter }; | ||
| } | ||
| if (RunExpiredError.is(err)) { | ||
| runtimeLogger.info( | ||
| `Workflow run "${workflowRunId}" has already completed, skipping step "${stepId}": ${err.message}` | ||
| ); | ||
| return; | ||
| } | ||
| if (EntityConflictError.is(err)) { | ||
| runtimeLogger.debug( | ||
| 'Step in terminal state, re-enqueuing workflow', | ||
| { | ||
| stepName, | ||
| stepId, | ||
| workflowRunId, | ||
| error: err.message, | ||
| } | ||
| ); | ||
| span?.setAttributes({ | ||
| ...Attribute.StepSkipped(true), | ||
| ...Attribute.StepSkipReason('completed'), | ||
| }); | ||
| span?.addEvent?.('step.skipped', { | ||
| 'skip.reason': 'terminal_state', | ||
| 'step.name': stepName, | ||
| 'step.id': stepId, | ||
| }); | ||
| await queueMessage(world, getWorkflowQueueName(workflowName), { | ||
| runId: workflowRunId, | ||
| traceCarrier: await serializeTraceCarrier(), | ||
| requestedAt: new Date(), | ||
| }); | ||
| return; | ||
| } | ||
| let stepHandler: ((req: Request) => Promise<Response>) | undefined; | ||
| function getStepHandler(): (req: Request) => Promise<Response> { | ||
| if (!stepHandler) { |
There was a problem hiding this comment.
Highlight what changed; everything else is formatting.
| } | ||
| ); | ||
| return stepHandler; | ||
| } |
| /* @__PURE__ */ withHealthCheck(stepHandler); | ||
| withHealthCheck((req: Request) => getStepHandler()(req)); |
|
Talking about this internally right now. Without digging deeply into the code, my quick note is that we'd have to do the same for flow handlers, not just step handlers if we do this, right? |
|
@pranaygp, sorry, I'm not familiar with flow handlers. I only see two instances of The first workflow/packages/core/src/runtime.ts Lines 88 to 91 in 329cdb3
I added more details to the issue. Hope this helps. |
|
Fixed by #942 |
Lazy-evaluate
getWorldHandlers()in step-handlerFixes #825
Problem
getWorldHandlers()is called at module load time instep-handler.ts. When deploying to AWS Lambda via OpenNext, this runs beforeinstrumentation.ts'sregister()can callsetWorld(), causingcreateWorld()to fall through to a dynamicrequire(WORKFLOW_TARGET_WORLD)that bundlers can't trace — resulting inCannot find module '@workflow/world-postgres'.Fix
Defer the
getWorldHandlers().createQueueHandler(...)call from module scope to first invocation, sosetWorld()has a chance to run first viaregister().