From 05eb6d2e6790a80294163baae09cd01ebd8eb6a7 Mon Sep 17 00:00:00 2001 From: Martin PAUCOT Date: Tue, 24 Mar 2026 15:06:42 +0100 Subject: [PATCH] feat(dispatcher): add ability to wait for job completion --- src/contracts/adapter.ts | 8 ++- src/drivers/knex_adapter.ts | 95 ++++++++++++++++-------------------- src/exceptions.ts | 10 ++++ src/job.ts | 11 +++-- src/job_dispatcher.ts | 41 ++++++++++++++-- src/job_runtime.ts | 4 +- src/services/queue_schema.ts | 6 +-- src/types/main.ts | 2 + src/worker.ts | 53 ++++++++++++++++---- 9 files changed, 153 insertions(+), 77 deletions(-) diff --git a/src/contracts/adapter.ts b/src/contracts/adapter.ts index e05067e..3ebf37f 100644 --- a/src/contracts/adapter.ts +++ b/src/contracts/adapter.ts @@ -84,8 +84,14 @@ export interface Adapter { * @param jobId - The job ID to complete * @param queue - The queue the job belongs to * @param removeOnComplete - Optional retention policy for completed jobs + * @param output - Optional output returned by the job */ - completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise + completeJob( + jobId: string, + queue: string, + removeOnComplete?: JobRetention, + output?: any + ): Promise /** * Mark a job as failed permanently and remove it from the queue. diff --git a/src/drivers/knex_adapter.ts b/src/drivers/knex_adapter.ts index 56bcd34..2e720b3 100644 --- a/src/drivers/knex_adapter.ts +++ b/src/drivers/knex_adapter.ts @@ -117,9 +117,7 @@ export class KnexAdapter implements Adapter { // Update job to active status // For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim - const updateQuery = trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) + const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue) if (!this.#supportsSkipLocked()) { updateQuery.where('status', 'pending') @@ -178,19 +176,21 @@ export class KnexAdapter implements Adapter { const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await trx(this.#jobsTable) - .where('id', job.id) - .where('queue', queue) - .update({ - status: 'pending', - score, - execute_at: null, - }) + await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({ + status: 'pending', + score, + execute_at: null, + }) } }) } - async completeJob(jobId: string, queue: string, removeOnComplete?: JobRetention): Promise { + async completeJob( + jobId: string, + queue: string, + removeOnComplete?: JobRetention, + output?: any + ): Promise { const { keep, maxAge, maxCount } = resolveRetention(removeOnComplete) if (!keep) { @@ -213,6 +213,7 @@ export class KnexAdapter implements Adapter { worker_id: null, acquired_at: null, finished_at: now, + output: output ? JSON.stringify(output) : null, }) if (!updated) { @@ -276,6 +277,7 @@ export class KnexAdapter implements Adapter { status: row.status as JobStatus, data: jobData, finishedAt: row.finished_at ? Number(row.finished_at) : undefined, + output: row.output ? JSON.parse(row.output) : undefined, error: row.error || undefined, } } @@ -331,33 +333,27 @@ export class KnexAdapter implements Adapter { if (retryAt && retryAt.getTime() > now) { // Move to delayed - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'delayed', - data: updatedData, - worker_id: null, - acquired_at: null, - score: null, - execute_at: retryAt.getTime(), - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'delayed', + data: updatedData, + worker_id: null, + acquired_at: null, + score: null, + execute_at: retryAt.getTime(), + }) } else { // Move back to pending const priority = jobData.priority ?? DEFAULT_PRIORITY const score = calculateScore(priority, now) - await this.#connection(this.#jobsTable) - .where('id', jobId) - .where('queue', queue) - .update({ - status: 'pending', - data: updatedData, - worker_id: null, - acquired_at: null, - score, - execute_at: null, - }) + await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({ + status: 'pending', + data: updatedData, + worker_id: null, + acquired_at: null, + score, + execute_at: null, + }) } } @@ -458,10 +454,7 @@ export class KnexAdapter implements Adapter { if (currentStalledCount >= maxStalledCount) { // Fail permanently - remove the job - await trx(this.#jobsTable) - .where('id', row.id) - .where('queue', queue) - .delete() + await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete() } else { // Recover: increment stalledCount and put back in pending jobData.stalledCount = currentStalledCount + 1 @@ -534,9 +527,9 @@ export class KnexAdapter implements Adapter { } async getSchedule(id: string): Promise { - const row = (await this.#connection(this.#schedulesTable) - .where('id', id) - .first()) as ScheduleRow | undefined + const row = (await this.#connection(this.#schedulesTable).where('id', id).first()) as + | ScheduleRow + | undefined if (!row) return null return this.#rowToScheduleData(row) @@ -565,16 +558,12 @@ export class KnexAdapter implements Adapter { if (updates.runCount !== undefined) data.run_count = updates.runCount if (Object.keys(data).length > 0) { - await this.#connection(this.#schedulesTable) - .where('id', id) - .update(data) + await this.#connection(this.#schedulesTable).where('id', id).update(data) } } async deleteSchedule(id: string): Promise { - await this.#connection(this.#schedulesTable) - .where('id', id) - .delete() + await this.#connection(this.#schedulesTable).where('id', id).delete() } async claimDueSchedule(): Promise { @@ -629,13 +618,11 @@ export class KnexAdapter implements Adapter { } // Update atomically - await trx(this.#schedulesTable) - .where('id', row.id) - .update({ - next_run_at: nextRunAt, - last_run_at: now, - run_count: newRunCount, - }) + await trx(this.#schedulesTable).where('id', row.id).update({ + next_run_at: nextRunAt, + last_run_at: now, + run_count: newRunCount, + }) // Return schedule data (before update state for payload) return this.#rowToScheduleData(row) diff --git a/src/exceptions.ts b/src/exceptions.ts index 6700e35..4ccacaa 100644 --- a/src/exceptions.ts +++ b/src/exceptions.ts @@ -35,6 +35,16 @@ export const E_JOB_NOT_FOUND = createError<[jobName: string]>( 'E_JOB_NOT_FOUND' ) +export const E_JOB_EXECUTION_NOT_FOUND = createError<[jobId: string]>( + 'The job execution "%s" could not be found', + 'E_JOB_EXECUTION_NOT_FOUND' +) + +export const E_JOB_EXECUTION_FAILED = createError<[jobId: string]>( + 'The job execution "%s" failed', + 'E_JOB_EXECUTION_FAILED' +) + export const E_JOB_MAX_ATTEMPTS_REACHED = createError<[jobName: string]>( 'The job "%s" has reached the maximum number of retry attempts', 'E_JOB_MAX_ATTEMPTS_REACHED' diff --git a/src/job.ts b/src/job.ts index edff380..2dc5de1 100644 --- a/src/job.ts +++ b/src/job.ts @@ -46,7 +46,7 @@ import type { JobContext, JobOptions } from './types/main.js' * } * ``` */ -export abstract class Job { +export abstract class Job { #payload!: Payload #context!: JobContext #signal?: AbortSignal @@ -173,12 +173,15 @@ export abstract class Job { static dispatch( this: abstract new (...args: any[]) => T, payload: T extends Job ? P : never - ): JobDispatcher ? P : never> { + ): JobDispatcher ? P : never, T extends Job ? O : never> { const jobClass = this as unknown as { options?: JobOptions; name: string } const options = jobClass.options || {} const jobName = options.name || this.name - const dispatcher = new JobDispatcher ? P : never>(jobName, payload) + const dispatcher = new JobDispatcher< + T extends Job ? P : never, + T extends Job ? O : never + >(jobName, payload) if (options.queue) { dispatcher.toQueue(options.queue) @@ -305,7 +308,7 @@ export abstract class Job { * } * ``` */ - abstract execute(): Promise + abstract execute(): Promise /** * Called when the job has permanently failed (after all retries exhausted). diff --git a/src/job_dispatcher.ts b/src/job_dispatcher.ts index 89ada1c..66c5af5 100644 --- a/src/job_dispatcher.ts +++ b/src/job_dispatcher.ts @@ -5,7 +5,9 @@ import { dispatchChannel } from './tracing_channels.js' import type { Adapter } from './contracts/adapter.js' import type { DispatchResult, Duration } from './types/main.js' import type { JobDispatchMessage } from './types/tracing_channels.js' +import { setTimeout } from 'node:timers/promises' import { parse } from './utils.js' +import { E_JOB_EXECUTION_FAILED, E_JOB_EXECUTION_NOT_FOUND } from './exceptions.ts' /** * Fluent builder for dispatching jobs to the queue. @@ -39,9 +41,9 @@ import { parse } from './utils.js' * await ReminderJob.dispatch({ userId: 123 }).in('24h') * ``` */ -export class JobDispatcher { +export class JobDispatcher { readonly #name: string - readonly #payload: T + readonly #payload: TPayload #queue: string = 'default' #adapter?: string | (() => Adapter) #delay?: Duration @@ -54,7 +56,7 @@ export class JobDispatcher { * @param name - The job class name (used to locate the class at runtime) * @param payload - The data to pass to the job */ - constructor(name: string, payload: T) { + constructor(name: string, payload: TPayload) { this.#name = name this.#payload = payload } @@ -211,6 +213,39 @@ export class JobDispatcher { return { jobId: id } } + /** + * Dispatch the job to the queue and + * await for job to complete or fail. + * + * @param pollingInterval - Interval between each check + * @param signal - Optional signal to abort waiting + * @returns The job output + */ + async wait(pollingInterval: Duration = 2000, signal?: AbortSignal): Promise { + const adapter = this.#getAdapterInstance() + const dispatchResult = await this.run() + + while (true) { + signal?.throwIfAborted() + + await setTimeout(parse(pollingInterval)) + + const job = await adapter.getJob(dispatchResult.jobId, this.#queue) + + if (!job) { + throw new E_JOB_EXECUTION_NOT_FOUND([dispatchResult.jobId]) + } + + if (job.status === 'completed') { + return job.output + } + + if (job.status === 'failed') { + throw new E_JOB_EXECUTION_FAILED([dispatchResult.jobId], { cause: job.error }) + } + } + } + /** * Thenable implementation for auto-dispatch when awaited. * diff --git a/src/job_runtime.ts b/src/job_runtime.ts index 8359ea5..56fd83e 100644 --- a/src/job_runtime.ts +++ b/src/job_runtime.ts @@ -75,7 +75,7 @@ export class JobExecutionRuntime { /** * Execute a hydrated job instance and enforce the configured timeout. */ - async execute(instance: Job, payload: unknown, context: JobContext): Promise { + async execute(instance: Job, payload: unknown, context: JobContext): Promise { if (this.#timeout === undefined) { instance.$hydrate(payload, context) return instance.execute() @@ -90,7 +90,7 @@ export class JobExecutionRuntime { ) try { - await Promise.race([instance.execute(), abortPromise]) + return Promise.race([instance.execute(), abortPromise]) } finally { cleanupAbortListener() } diff --git a/src/services/queue_schema.ts b/src/services/queue_schema.ts index ed53eef..16c838a 100644 --- a/src/services/queue_schema.ts +++ b/src/services/queue_schema.ts @@ -20,6 +20,7 @@ export class QueueSchemaService { table.string('queue', 255).notNullable() table.enu('status', ['pending', 'active', 'delayed', 'completed', 'failed']).notNullable() table.text('data').notNullable() + table.text('output').nullable() table.bigint('score').unsigned().nullable() table.string('worker_id', 255).nullable() table.bigint('acquired_at').unsigned().nullable() @@ -57,10 +58,7 @@ export class QueueSchemaService { table.integer('run_count').unsigned().notNullable().defaultTo(0) table.timestamp('next_run_at').nullable() table.timestamp('last_run_at').nullable() - table - .timestamp('created_at') - .notNullable() - .defaultTo(this.#connection.fn.now()) + table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now()) table.index(['status', 'next_run_at']) extend?.(table) diff --git a/src/types/main.ts b/src/types/main.ts index 4494669..3088141 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -141,6 +141,8 @@ export interface JobRecord { finishedAt?: number /** Error message (for failed jobs) */ error?: string + /** Serialized job output */ + output?: any } /** diff --git a/src/worker.ts b/src/worker.ts index 5316d00..086bb6a 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -7,7 +7,13 @@ import { JobPool } from './job_pool.js' import { JobExecutionRuntime } from './job_runtime.js' import { dispatchChannel, executeChannel } from './tracing_channels.js' import type { Adapter, AcquiredJob } from './contracts/adapter.js' -import type { JobContext, JobOptions, JobRetention, QueueManagerConfig, WorkerCycle } from './types/main.js' +import type { + JobContext, + JobOptions, + JobRetention, + QueueManagerConfig, + WorkerCycle, +} from './types/main.js' import type { JobDispatchMessage, JobExecuteMessage } from './types/tracing_channels.js' import { Locator } from './locator.js' import { DEFAULT_PRIORITY } from './constants.js' @@ -346,12 +352,27 @@ export class Worker { const run = () => { return executeChannel.tracePromise(async () => { try { - await runtime.execute(instance, payload, context) - await this.#wrapInternal(() => this.#adapter.completeJob(job.id, queue, retention.removeOnComplete)) + const output = await runtime.execute(instance, payload, context) + await this.#wrapInternal(() => + this.#adapter.completeJob(job.id, queue, retention.removeOnComplete, output) + ) executeMessage.status = 'completed' - debug('worker %s: successfully executed job %s in %dms', this.#id, job.id, (performance.now() - startTime).toFixed(2)) + debug( + 'worker %s: successfully executed job %s in %dms', + this.#id, + job.id, + (performance.now() - startTime).toFixed(2) + ) } catch (e) { - await this.#handleExecutionFailure({ error: e as Error, job, queue, instance, runtime, retention, executeMessage }) + await this.#handleExecutionFailure({ + error: e as Error, + job, + queue, + instance, + runtime, + retention, + executeMessage, + }) } executeMessage.duration = Number((performance.now() - startTime).toFixed(2)) @@ -377,7 +398,12 @@ export class Worker { if (outcome.type === 'failed') { options.executeMessage.status = 'failed' await this.#wrapInternal(() => - this.#adapter.failJob(options.job.id, options.queue, outcome.storageError, options.retention.removeOnFail) + this.#adapter.failJob( + options.job.id, + options.queue, + outcome.storageError, + options.retention.removeOnFail + ) ) await options.instance.failed?.(outcome.hookError) return @@ -389,8 +415,15 @@ export class Worker { options.executeMessage.nextRetryAt = outcome.retryAt if (outcome.retryAt) { - debug('worker %s: job %s will retry at %s', this.#id, options.job.id, outcome.retryAt.toISOString()) - await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt)) + debug( + 'worker %s: job %s will retry at %s', + this.#id, + options.job.id, + outcome.retryAt.toISOString() + ) + await this.#wrapInternal(() => + this.#adapter.retryJob(options.job.id, options.queue, outcome.retryAt) + ) } else { await this.#wrapInternal(() => this.#adapter.retryJob(options.job.id, options.queue)) } @@ -426,7 +459,9 @@ export class Worker { } catch (error) { debug('worker %s: failed to initialize job %s (%s)', this.#id, job.id, job.name) const retention = QueueManager.getConfigResolver().resolveJobOptions(queue) - await this.#wrapInternal(() => this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail)) + await this.#wrapInternal(() => + this.#adapter.failJob(job.id, queue, error as Error, retention.removeOnFail) + ) throw error } }