diff --git a/.changeset/fix-local-build-load.md b/.changeset/fix-local-build-load.md new file mode 100644 index 00000000000..13f91da9d6a --- /dev/null +++ b/.changeset/fix-local-build-load.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Fix `--load` flag being silently ignored on local/self-hosted builds. diff --git a/.claude/skills/span-timeline-events/SKILL.md b/.claude/skills/span-timeline-events/SKILL.md new file mode 100644 index 00000000000..122f49912d7 --- /dev/null +++ b/.claude/skills/span-timeline-events/SKILL.md @@ -0,0 +1,78 @@ +--- +name: span-timeline-events +description: Use when adding, modifying, or debugging OTel span timeline events in the trace view. Covers event structure, ClickHouse storage constraints, rendering in SpanTimeline component, admin visibility, and the step-by-step process for adding new events. +allowed-tools: Read, Write, Edit, Glob, Grep, Bash +--- + +# Span Timeline Events + +The trace view's right panel shows a timeline of events for the selected span. These are OTel span events rendered by `app/utils/timelineSpanEvents.ts` and the `SpanTimeline` component. + +## How They Work + +1. **Span events** in OTel are attached to a parent span. In ClickHouse, they're stored as separate rows with `kind: "SPAN_EVENT"` sharing the parent span's `span_id`. The `#mergeRecordsIntoSpanDetail` method reassembles them into the span's `events` array at query time. +2. The timeline only renders events whose `name` starts with `trigger.dev/` - all others are silently filtered out. +3. The **display name** comes from `properties.event` (not the span event name), mapped through `getFriendlyNameForEvent()`. +4. Events are shown on the **span they belong to** - events on one span don't appear in another span's timeline. + +## ClickHouse Storage Constraint + +When events are written to ClickHouse, `spanEventsToTaskEventV1Input()` filters out events whose `start_time` is not greater than the parent span's `startTime`. Events at or before the span start are silently dropped. This means span events must have timestamps strictly after the span's own `startTimeUnixNano`. + +## Timeline Rendering (SpanTimeline component) + +The `SpanTimeline` component in `app/components/run/RunTimeline.tsx` renders: + +1. **Events** (thin 1px line with hollow dots) - all events from `createTimelineSpanEventsFromSpanEvents()` +2. **"Started"** marker (thick cap) - at the span's `startTime` +3. **Duration bar** (thick 7px line) - from "Started" to "Finished" +4. **"Finished"** marker (thick cap) - at `startTime + duration` + +The thin line before "Started" only appears when there are events with timestamps between the span start and the first child span. For the Attempt span this works well (Dequeued -> Pod scheduled -> Launched -> etc. all happen before execution starts). Events all get `lineVariant: "light"` (thin) while the execution bar gets `variant: "normal"` (thick). + +## Trace View Sort Order + +Sibling spans (same parent) are sorted by `start_time ASC` from the ClickHouse query. The `createTreeFromFlatItems` function preserves this order. Event timestamps don't affect sort order - only the span's own `start_time`. + +## Event Structure + +```typescript +// OTel span event format +{ + name: "trigger.dev/run", // Must start with "trigger.dev/" to render + timeUnixNano: "1711200000000000000", + attributes: [ + { key: "event", value: { stringValue: "dequeue" } }, // The actual event type + { key: "duration", value: { intValue: 150 } }, // Optional: duration in ms + ] +} +``` + +## Admin-Only Events + +`getAdminOnlyForEvent()` controls visibility. Events default to **admin-only** (`true`). + +| Event | Admin-only | Friendly name | +|-------|-----------|---------------| +| `dequeue` | No | Dequeued | +| `fork` | No | Launched | +| `import` | No (if no fork event) | Importing task file | +| `create_attempt` | Yes | Attempt created | +| `lazy_payload` | Yes | Lazy attempt initialized | +| `pod_scheduled` | Yes | Pod scheduled | +| (default) | Yes | (raw event name) | + +## Adding New Timeline Events + +1. Add OTLP span event with `name: "trigger.dev/"` and `properties.event: ""` +2. Event timestamp must be strictly after the parent span's `startTimeUnixNano` (ClickHouse drops earlier events) +3. Add friendly name in `getFriendlyNameForEvent()` in `app/utils/timelineSpanEvents.ts` +4. Set admin visibility in `getAdminOnlyForEvent()` +5. Optionally add help text in `getHelpTextForEvent()` + +## Key Files + +- `app/utils/timelineSpanEvents.ts` - filtering, naming, admin logic +- `app/components/run/RunTimeline.tsx` - `SpanTimeline` component (thin line + thick bar rendering) +- `app/presenters/v3/SpanPresenter.server.ts` - loads span data including events +- `app/v3/eventRepository/clickhouseEventRepository.server.ts` - `spanEventsToTaskEventV1Input()` (storage filter), `#mergeRecordsIntoSpanDetail` (reassembly) diff --git a/CLAUDE.md b/CLAUDE.md index 9e53955b092..0a54cced672 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -80,6 +80,10 @@ pnpm run changeset:add When modifying only server components (`apps/webapp/`, `apps/supervisor/`, etc.) with no package changes, add a `.server-changes/` file instead. See `.server-changes/README.md` for format and documentation. +## Dependency Pinning + +Zod is pinned to a single version across the entire monorepo (currently `3.25.76`). When adding zod to a new or existing package, use the **exact same version** as the rest of the repo - never a different version or a range. Mismatched zod versions cause runtime type incompatibilities (e.g., schemas from one package can't be used as body validators in another). + ## Architecture Overview ### Request Flow diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index e9609bf1541..7456d421850 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -14,9 +14,11 @@ }, "dependencies": { "@aws-sdk/client-ecr": "^3.839.0", + "@internal/compute": "workspace:*", "@kubernetes/client-node": "^1.0.0", "@trigger.dev/core": "workspace:*", "dockerode": "^4.0.6", + "p-limit": "^6.2.0", "prom-client": "^15.1.0", "socket.io": "4.7.4", "std-env": "^3.8.0", diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 9eb4aead840..fa5b23efbb1 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -3,153 +3,188 @@ import { env as stdEnv } from "std-env"; import { z } from "zod"; import { AdditionalEnvVars, BoolEnv } from "./envUtil.js"; -const Env = z.object({ - // This will come from `spec.nodeName` in k8s - TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), - TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), - - // Required settings - TRIGGER_API_URL: z.string().url(), - TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file - MANAGED_WORKER_SECRET: z.string(), - OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners - - // Workload API settings (coordinator mode) - the workload API is what the run controller connects to - TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), - TRIGGER_WORKLOAD_API_PROTOCOL: z - .string() - .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) - .default("http"), - TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default - TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), - TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on - TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller - - // Runner settings - RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), - RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) - RUNNER_PRETTY_LOGS: BoolEnv.default(false), - - // Dequeue settings (provider mode) - TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), - TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), - TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), - TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), - TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), - TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), - TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds - TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds - TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) - TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) - TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) - TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) - - // Optional services - TRIGGER_WARM_START_URL: z.string().optional(), - TRIGGER_CHECKPOINT_URL: z.string().optional(), - TRIGGER_METADATA_URL: z.string().optional(), - - // Used by the resource monitor - RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), - RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), - RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), - - // Docker settings - DOCKER_API_VERSION: z.string().optional(), - DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 - DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), - DOCKER_REGISTRY_USERNAME: z.string().optional(), - DOCKER_REGISTRY_PASSWORD: z.string().optional(), - DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 - DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), - DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), - /** - * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. - * Any other value is taken as a custom network's name to which all runners should connect to. - * - * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. - * - * **WARNING**: Specifying multiple networks will slightly increase startup times. - * - * @default "host" - */ - DOCKER_RUNNER_NETWORKS: z.string().default("host"), - - // Kubernetes settings - KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), - KUBERNETES_NAMESPACE: z.string().default("default"), - KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), - KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv - KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), - KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), - KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), - KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), - KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit - KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), - KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit - - // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO - KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO - KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), - KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), - - KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB - KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods - // Large machine affinity settings - large-* presets prefer a dedicated pool - KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), - KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("large-machines"), - KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(100), - - // Project affinity settings - pods from the same project prefer the same node - KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), - KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z.string().trim().min(1).default("kubernetes.io/hostname"), - - // Schedule affinity settings - runs from schedule trees prefer a dedicated pool - KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false), - KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), - KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("scheduled-runs"), - KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80), - KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20), - - // Placement tags settings - PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), - PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), - - // Metrics - METRICS_ENABLED: BoolEnv.default(true), - METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), - METRICS_HOST: z.string().default("127.0.0.1"), - METRICS_PORT: z.coerce.number().int().default(9090), - - // Pod cleaner - POD_CLEANER_ENABLED: BoolEnv.default(true), - POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), - POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), - - // Failed pod handler - FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), - FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), - - // Debug - DEBUG: BoolEnv.default(false), - SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), -}); +const Env = z + .object({ + // This will come from `spec.nodeName` in k8s + TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()), + TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30), + + // Required settings + TRIGGER_API_URL: z.string().url(), + TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file + MANAGED_WORKER_SECRET: z.string(), + OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), // set on the runners + + // Workload API settings (coordinator mode) - the workload API is what the run controller connects to + TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default(true), + TRIGGER_WORKLOAD_API_PROTOCOL: z + .string() + .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) + .default("http"), + TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default + TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), + TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on + TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller + + // Runner settings + RUNNER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), + RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) + RUNNER_PRETTY_LOGS: BoolEnv.default(false), + + // Dequeue settings (provider mode) + TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true), + TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250), + TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000), + TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1), + TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10), + TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"), + TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds + TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds + TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer) + TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current) + TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms) + TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate) + + // Optional services + TRIGGER_WARM_START_URL: z.string().optional(), + TRIGGER_CHECKPOINT_URL: z.string().optional(), + TRIGGER_METADATA_URL: z.string().optional(), + + // Used by the resource monitor + RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), + RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), + RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), + + // Docker settings + DOCKER_API_VERSION: z.string().optional(), + DOCKER_PLATFORM: z.string().optional(), // e.g. linux/amd64, linux/arm64 + DOCKER_STRIP_IMAGE_DIGEST: BoolEnv.default(true), + DOCKER_REGISTRY_USERNAME: z.string().optional(), + DOCKER_REGISTRY_PASSWORD: z.string().optional(), + DOCKER_REGISTRY_URL: z.string().optional(), // e.g. https://index.docker.io/v1 + DOCKER_ENFORCE_MACHINE_PRESETS: BoolEnv.default(true), + DOCKER_AUTOREMOVE_EXITED_CONTAINERS: BoolEnv.default(true), + /** + * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. + * Any other value is taken as a custom network's name to which all runners should connect to. + * + * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. + * + * **WARNING**: Specifying multiple networks will slightly increase startup times. + * + * @default "host" + */ + DOCKER_RUNNER_NETWORKS: z.string().default("host"), + + // Compute settings + COMPUTE_GATEWAY_URL: z.string().url().optional(), + COMPUTE_GATEWAY_AUTH_TOKEN: z.string().optional(), + COMPUTE_GATEWAY_TIMEOUT_MS: z.coerce.number().int().default(30_000), + COMPUTE_SNAPSHOTS_ENABLED: BoolEnv.default(false), + COMPUTE_TRACE_SPANS_ENABLED: BoolEnv.default(true), + COMPUTE_TRACE_OTLP_ENDPOINT: z.string().url().optional(), // Override for span export (derived from TRIGGER_API_URL if unset) + COMPUTE_SNAPSHOT_DELAY_MS: z.coerce.number().int().min(0).max(60_000).default(5_000), + COMPUTE_SNAPSHOT_DISPATCH_LIMIT: z.coerce.number().int().min(1).max(100).default(10), + + // Kubernetes settings + KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), + KUBERNETES_NAMESPACE: z.string().default("default"), + KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"), + KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv + KUBERNETES_EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), + KUBERNETES_EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), + KUBERNETES_STRIP_IMAGE_DIGEST: BoolEnv.default(false), + KUBERNETES_CPU_REQUEST_MIN_CORES: z.coerce.number().min(0).default(0), + KUBERNETES_CPU_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(0.75), // Ratio of CPU limit, so 0.75 = 75% of CPU limit + KUBERNETES_MEMORY_REQUEST_MIN_GB: z.coerce.number().min(0).default(0), + KUBERNETES_MEMORY_REQUEST_RATIO: z.coerce.number().min(0).max(1).default(1), // Ratio of memory limit, so 1 = 100% of memory limit + + // Per-preset overrides of the global KUBERNETES_CPU_REQUEST_RATIO + KUBERNETES_CPU_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_CPU_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + // Per-preset overrides of the global KUBERNETES_MEMORY_REQUEST_RATIO + KUBERNETES_MEMORY_REQUEST_RATIO_MICRO: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_SMALL_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_MEDIUM_2X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_1X: z.coerce.number().min(0).max(1).optional(), + KUBERNETES_MEMORY_REQUEST_RATIO_LARGE_2X: z.coerce.number().min(0).max(1).optional(), + + KUBERNETES_MEMORY_OVERHEAD_GB: z.coerce.number().min(0).optional(), // Optional memory overhead to add to the limit in GB + KUBERNETES_SCHEDULER_NAME: z.string().optional(), // Custom scheduler name for pods + // Large machine affinity settings - large-* presets prefer a dedicated pool + KUBERNETES_LARGE_MACHINE_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_LARGE_MACHINE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("large-machines"), + KUBERNETES_LARGE_MACHINE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(100), + + // Project affinity settings - pods from the same project prefer the same node + KUBERNETES_PROJECT_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_PROJECT_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(50), + KUBERNETES_PROJECT_AFFINITY_TOPOLOGY_KEY: z + .string() + .trim() + .min(1) + .default("kubernetes.io/hostname"), + + // Schedule affinity settings - runs from schedule trees prefer a dedicated pool + KUBERNETES_SCHEDULE_AFFINITY_ENABLED: BoolEnv.default(false), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_KEY: z.string().trim().min(1).default("node.cluster.x-k8s.io/machinepool"), + KUBERNETES_SCHEDULE_AFFINITY_POOL_LABEL_VALUE: z.string().trim().min(1).default("scheduled-runs"), + KUBERNETES_SCHEDULE_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(80), + KUBERNETES_SCHEDULE_ANTI_AFFINITY_WEIGHT: z.coerce.number().int().min(1).max(100).default(20), + + // Placement tags settings + PLACEMENT_TAGS_ENABLED: BoolEnv.default(false), + PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io"), + + // Metrics + METRICS_ENABLED: BoolEnv.default(true), + METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), + METRICS_HOST: z.string().default("127.0.0.1"), + METRICS_PORT: z.coerce.number().int().default(9090), + + // Pod cleaner + POD_CLEANER_ENABLED: BoolEnv.default(true), + POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), + POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), + + // Failed pod handler + FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), + FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), + + // Debug + DEBUG: BoolEnv.default(false), + SEND_RUN_DEBUG_LOGS: BoolEnv.default(false), + }) + .superRefine((data, ctx) => { + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_METADATA_URL) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_METADATA_URL is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_METADATA_URL"], + }); + } + if (data.COMPUTE_SNAPSHOTS_ENABLED && !data.TRIGGER_WORKLOAD_API_DOMAIN) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + message: "TRIGGER_WORKLOAD_API_DOMAIN is required when COMPUTE_SNAPSHOTS_ENABLED is true", + path: ["TRIGGER_WORKLOAD_API_DOMAIN"], + }); + } + }) + .transform((data) => ({ + ...data, + COMPUTE_TRACE_OTLP_ENDPOINT: data.COMPUTE_TRACE_OTLP_ENDPOINT ?? `${data.TRIGGER_API_URL}/otel`, + })); export const env = Env.parse(stdEnv); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 1de788b1e46..6f5913c47ca 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -14,6 +14,7 @@ import { } from "./resourceMonitor.js"; import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js"; import { DockerWorkloadManager } from "./workloadManager/docker.js"; +import { ComputeWorkloadManager } from "./workloadManager/compute.js"; import { HttpServer, CheckpointClient, @@ -25,6 +26,8 @@ import { register } from "./metrics.js"; import { PodCleaner } from "./services/podCleaner.js"; import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; +import { OtlpTraceService } from "./services/otlpTraceService.js"; +import { extractTraceparent, getRestoreRunnerId } from "./util.js"; if (env.METRICS_COLLECT_DEFAULTS) { collectDefaultMetrics({ register }); @@ -35,18 +38,25 @@ class ManagedSupervisor { private readonly metricsServer?: HttpServer; private readonly workloadServer: WorkloadServer; private readonly workloadManager: WorkloadManager; + private readonly computeManager?: ComputeWorkloadManager; private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; + private readonly tracing?: OtlpTraceService; private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; constructor() { - const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, ...envWithoutSecrets } = env; + const { + TRIGGER_WORKER_TOKEN, + MANAGED_WORKER_SECRET, + COMPUTE_GATEWAY_AUTH_TOKEN, + ...envWithoutSecrets + } = env; if (env.DEBUG) { this.logger.debug("Starting up", { envWithoutSecrets }); @@ -77,9 +87,46 @@ class ManagedSupervisor { : new DockerResourceMonitor(new Docker()) : new NoopResourceMonitor(); - this.workloadManager = this.isKubernetes - ? new KubernetesWorkloadManager(workloadManagerOptions) - : new DockerWorkloadManager(workloadManagerOptions); + if (env.COMPUTE_GATEWAY_URL) { + if (!env.TRIGGER_WORKLOAD_API_DOMAIN) { + throw new Error("TRIGGER_WORKLOAD_API_DOMAIN is not set, cannot create compute manager"); + } + + const callbackUrl = `${env.TRIGGER_WORKLOAD_API_PROTOCOL}://${env.TRIGGER_WORKLOAD_API_DOMAIN}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}/api/v1/compute/snapshot-complete`; + + if (env.COMPUTE_TRACE_SPANS_ENABLED) { + this.tracing = new OtlpTraceService({ + endpointUrl: env.COMPUTE_TRACE_OTLP_ENDPOINT, + }); + } + + const computeManager = new ComputeWorkloadManager({ + ...workloadManagerOptions, + gateway: { + url: env.COMPUTE_GATEWAY_URL, + authToken: env.COMPUTE_GATEWAY_AUTH_TOKEN, + timeoutMs: env.COMPUTE_GATEWAY_TIMEOUT_MS, + }, + snapshots: { + enabled: env.COMPUTE_SNAPSHOTS_ENABLED, + delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS, + dispatchLimit: env.COMPUTE_SNAPSHOT_DISPATCH_LIMIT, + callbackUrl, + }, + tracing: this.tracing, + runner: { + instanceName: env.TRIGGER_WORKER_INSTANCE_NAME, + otelEndpoint: env.OTEL_EXPORTER_OTLP_ENDPOINT, + prettyLogs: env.RUNNER_PRETTY_LOGS, + }, + }); + this.computeManager = computeManager; + this.workloadManager = computeManager; + } else { + this.workloadManager = this.isKubernetes + ? new KubernetesWorkloadManager(workloadManagerOptions) + : new DockerWorkloadManager(workloadManagerOptions); + } if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { @@ -182,104 +229,161 @@ class ManagedSupervisor { } this.workerSession.on("runNotification", async ({ time, run }) => { - this.logger.log("runNotification", { time, run }); + this.logger.verbose("runNotification", { time, run }); this.workloadServer.notifyRun({ run }); }); - this.workerSession.on("runQueueMessage", async ({ time, message }) => { - this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message); + this.workerSession.on( + "runQueueMessage", + async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => { + this.logger.verbose(`Received message with timestamp ${time.toLocaleString()}`, message); - if (message.completedWaitpoints.length > 0) { - this.logger.debug("Run has completed waitpoints", { - runId: message.run.id, - completedWaitpoints: message.completedWaitpoints.length, - }); - } - - if (!message.image) { - this.logger.error("Run has no image", { runId: message.run.id }); - return; - } - - const { checkpoint, ...rest } = message; - - if (checkpoint) { - this.logger.log("Restoring run", { runId: message.run.id }); + if (message.completedWaitpoints.length > 0) { + this.logger.debug("Run has completed waitpoints", { + runId: message.run.id, + completedWaitpoints: message.completedWaitpoints.length, + }); + } - if (!this.checkpointClient) { - this.logger.error("No checkpoint client", { runId: message.run.id }); + if (!message.image) { + this.logger.error("Run has no image", { runId: message.run.id }); return; } - try { - const didRestore = await this.checkpointClient.restoreRun({ - runFriendlyId: message.run.friendlyId, - snapshotFriendlyId: message.snapshot.friendlyId, - body: { - ...rest, - checkpoint, - }, - }); - - if (didRestore) { - this.logger.log("Restore successful", { runId: message.run.id }); - } else { - this.logger.error("Restore failed", { runId: message.run.id }); + const { checkpoint, ...rest } = message; + + // Register trace context early so snapshot spans work for all paths + // (cold create, restore, warm start). Re-registration on restore is safe + // since dequeue always provides fresh context. + if (this.computeManager?.traceSpansEnabled) { + const traceparent = extractTraceparent(message.run.traceContext); + + if (traceparent) { + this.workloadServer.registerRunTraceContext(message.run.friendlyId, { + traceparent, + envId: message.environment.id, + orgId: message.organization.id, + projectId: message.project.id, + }); } - } catch (error) { - this.logger.error("Failed to restore run", { error }); } - return; - } + if (checkpoint) { + this.logger.debug("Restoring run", { runId: message.run.id }); + + if (this.computeManager) { + try { + const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id); + + const didRestore = await this.computeManager.restore({ + snapshotId: checkpoint.location, + runnerId, + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + machine: message.run.machine, + traceContext: message.run.traceContext, + envId: message.environment.id, + orgId: message.organization.id, + projectId: message.project.id, + dequeuedAt: message.dequeuedAt, + }); + + if (didRestore) { + this.logger.debug("Compute restore successful", { + runId: message.run.id, + runnerId, + }); + } else { + this.logger.error("Compute restore failed", { runId: message.run.id, runnerId }); + } + } catch (error) { + this.logger.error("Failed to restore run (compute)", { error }); + } + + return; + } + + if (!this.checkpointClient) { + this.logger.error("No checkpoint client", { runId: message.run.id }); + return; + } + + try { + const didRestore = await this.checkpointClient.restoreRun({ + runFriendlyId: message.run.friendlyId, + snapshotFriendlyId: message.snapshot.friendlyId, + body: { + ...rest, + checkpoint, + }, + }); + + if (didRestore) { + this.logger.debug("Restore successful", { runId: message.run.id }); + } else { + this.logger.error("Restore failed", { runId: message.run.id }); + } + } catch (error) { + this.logger.error("Failed to restore run", { error }); + } - this.logger.log("Scheduling run", { runId: message.run.id }); + return; + } - const didWarmStart = await this.tryWarmStart(message); + this.logger.debug("Scheduling run", { runId: message.run.id }); - if (didWarmStart) { - this.logger.log("Warm start successful", { runId: message.run.id }); - return; - } + const warmStartStart = performance.now(); + const didWarmStart = await this.tryWarmStart(message); + const warmStartCheckMs = Math.round(performance.now() - warmStartStart); - try { - if (!message.deployment.friendlyId) { - // mostly a type guard, deployments always exists for deployed environments - // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. - throw new Error("Deployment is missing"); + if (didWarmStart) { + this.logger.debug("Warm start successful", { runId: message.run.id }); + return; } - await this.workloadManager.create({ - dequeuedAt: message.dequeuedAt, - envId: message.environment.id, - envType: message.environment.type, - image: message.image, - machine: message.run.machine, - orgId: message.organization.id, - projectId: message.project.id, - deploymentFriendlyId: message.deployment.friendlyId, - deploymentVersion: message.backgroundWorker.version, - runId: message.run.id, - runFriendlyId: message.run.friendlyId, - version: message.version, - nextAttemptNumber: message.run.attemptNumber, - snapshotId: message.snapshot.id, - snapshotFriendlyId: message.snapshot.friendlyId, - placementTags: message.placementTags, - annotations: message.run.annotations, - hasPrivateLink: message.organization.hasPrivateLink, - }); + try { + if (!message.deployment.friendlyId) { + // mostly a type guard, deployments always exists for deployed environments + // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. + throw new Error("Deployment is missing"); + } - // Disabled for now - // this.resourceMonitor.blockResources({ - // cpu: message.run.machine.cpu, - // memory: message.run.machine.memory, - // }); - } catch (error) { - this.logger.error("Failed to create workload", { error }); + await this.workloadManager.create({ + dequeuedAt: message.dequeuedAt, + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, + envId: message.environment.id, + envType: message.environment.type, + image: message.image, + machine: message.run.machine, + orgId: message.organization.id, + projectId: message.project.id, + deploymentFriendlyId: message.deployment.friendlyId, + deploymentVersion: message.backgroundWorker.version, + runId: message.run.id, + runFriendlyId: message.run.friendlyId, + version: message.version, + nextAttemptNumber: message.run.attemptNumber, + snapshotId: message.snapshot.id, + snapshotFriendlyId: message.snapshot.friendlyId, + placementTags: message.placementTags, + traceContext: message.run.traceContext, + annotations: message.run.annotations, + hasPrivateLink: message.organization.hasPrivateLink, + }); + + // Disabled for now + // this.resourceMonitor.blockResources({ + // cpu: message.run.machine.cpu, + // memory: message.run.machine.memory, + // }); + } catch (error) { + this.logger.error("Failed to create workload", { error }); + } } - }); + ); if (env.METRICS_ENABLED) { this.metricsServer = new HttpServer({ @@ -298,6 +402,8 @@ class ManagedSupervisor { host: env.TRIGGER_WORKLOAD_API_HOST_INTERNAL, workerClient: this.workerSession.httpClient, checkpointClient: this.checkpointClient, + computeManager: this.computeManager, + tracing: this.tracing, }); this.workloadServer.on("runConnected", this.onRunConnected.bind(this)); @@ -382,6 +488,7 @@ class ManagedSupervisor { async stop() { this.logger.log("Shutting down"); + await this.workloadServer.stop(); await this.workerSession.stop(); // Optional services diff --git a/apps/supervisor/src/services/computeSnapshotService.ts b/apps/supervisor/src/services/computeSnapshotService.ts new file mode 100644 index 00000000000..7206f57fb73 --- /dev/null +++ b/apps/supervisor/src/services/computeSnapshotService.ts @@ -0,0 +1,240 @@ +import pLimit from "p-limit"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { parseTraceparent } from "@trigger.dev/core/v3/isomorphic"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; +import { type SnapshotCallbackPayload } from "@internal/compute"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; +import { TimerWheel } from "./timerWheel.js"; +import type { OtlpTraceService } from "./otlpTraceService.js"; + +type DelayedSnapshot = { + runnerId: string; + runFriendlyId: string; + snapshotFriendlyId: string; +}; + +export type RunTraceContext = { + traceparent: string; + envId: string; + orgId: string; + projectId: string; +}; + +export type ComputeSnapshotServiceOptions = { + computeManager: ComputeWorkloadManager; + workerClient: SupervisorHttpClient; + tracing?: OtlpTraceService; +}; + +export class ComputeSnapshotService { + private readonly logger = new SimpleStructuredLogger("compute-snapshot-service"); + + private static readonly MAX_TRACE_CONTEXTS = 10_000; + private readonly runTraceContexts = new Map(); + private readonly timerWheel: TimerWheel; + private readonly dispatchLimit: ReturnType; + + private readonly computeManager: ComputeWorkloadManager; + private readonly workerClient: SupervisorHttpClient; + private readonly tracing?: OtlpTraceService; + + constructor(opts: ComputeSnapshotServiceOptions) { + this.computeManager = opts.computeManager; + this.workerClient = opts.workerClient; + this.tracing = opts.tracing; + + this.dispatchLimit = pLimit(this.computeManager.snapshotDispatchLimit); + this.timerWheel = new TimerWheel({ + delayMs: this.computeManager.snapshotDelayMs, + onExpire: (item) => { + this.dispatchLimit(() => this.dispatch(item.data)).catch((error) => { + this.logger.error("Snapshot dispatch failed", { + runId: item.data.runFriendlyId, + runnerId: item.data.runnerId, + error, + }); + }); + }, + }); + this.timerWheel.start(); + } + + /** Schedule a delayed snapshot for a run. Replaces any pending snapshot for the same run. */ + schedule(runFriendlyId: string, data: DelayedSnapshot) { + this.timerWheel.submit(runFriendlyId, data); + this.logger.debug("Snapshot scheduled", { + runFriendlyId, + snapshotFriendlyId: data.snapshotFriendlyId, + delayMs: this.computeManager.snapshotDelayMs, + }); + } + + /** Cancel a pending delayed snapshot. Returns true if one was cancelled. */ + cancel(runFriendlyId: string): boolean { + const cancelled = this.timerWheel.cancel(runFriendlyId); + if (cancelled) { + this.logger.debug("Snapshot cancelled", { runFriendlyId }); + } + return cancelled; + } + + /** Handle the callback from the gateway after a snapshot completes or fails. */ + async handleCallback(body: SnapshotCallbackPayload) { + this.logger.debug("Snapshot callback", { + snapshotId: body.snapshot_id, + instanceId: body.instance_id, + status: body.status, + error: body.error, + metadata: body.metadata, + durationMs: body.duration_ms, + }); + + const runId = body.metadata?.runId; + const snapshotFriendlyId = body.metadata?.snapshotFriendlyId; + + if (!runId || !snapshotFriendlyId) { + this.logger.error("Snapshot callback missing metadata", { body }); + return { ok: false as const, status: 400 }; + } + + this.#emitSnapshotSpan(runId, body.duration_ms, body.snapshot_id); + + if (body.status === "completed") { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: true, + checkpoint: { + type: "COMPUTE", + location: body.snapshot_id, + }, + }, + }); + + if (result.success) { + this.logger.debug("Suspend completion submitted", { + runId, + instanceId: body.instance_id, + snapshotId: body.snapshot_id, + }); + } else { + this.logger.error("Failed to submit suspend completion", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } else { + const result = await this.workerClient.submitSuspendCompletion({ + runId, + snapshotId: snapshotFriendlyId, + body: { + success: false, + error: body.error ?? "Snapshot failed", + }, + }); + + if (!result.success) { + this.logger.error("Failed to submit suspend failure", { + runId, + snapshotFriendlyId, + error: result.error, + }); + } + } + + return { ok: true as const, status: 200 }; + } + + registerTraceContext(runFriendlyId: string, ctx: RunTraceContext) { + // Evict oldest entries if we've hit the cap. This is best-effort: on a busy + // supervisor, entries for long-lived runs may be evicted before their snapshot + // callback arrives, causing those snapshot spans to be silently dropped. + // That's acceptable - trace spans are observability sugar, not correctness. + if (this.runTraceContexts.size >= ComputeSnapshotService.MAX_TRACE_CONTEXTS) { + const firstKey = this.runTraceContexts.keys().next().value; + if (firstKey) { + this.runTraceContexts.delete(firstKey); + } + } + + this.runTraceContexts.set(runFriendlyId, ctx); + } + + /** Stop the timer wheel, dropping pending snapshots. */ + stop(): string[] { + // Intentionally drop pending snapshots rather than dispatching them. The supervisor + // is shutting down, so our callback URL will be dead by the time the gateway responds. + // Runners detect the supervisor is gone and reconnect to a new instance, which + // re-triggers the snapshot workflow. Snapshots are an optimization, not a correctness + // requirement - runs continue fine without them. + const remaining = this.timerWheel.stop(); + const droppedRuns = remaining.map((item) => item.key); + + if (droppedRuns.length > 0) { + this.logger.info("Stopped, dropped pending snapshots", { count: droppedRuns.length }); + this.logger.debug("Dropped snapshot details", { runs: droppedRuns }); + } + + return droppedRuns; + } + + /** Dispatch a snapshot request to the gateway. */ + private async dispatch(snapshot: DelayedSnapshot): Promise { + const result = await this.computeManager.snapshot({ + runnerId: snapshot.runnerId, + metadata: { + runId: snapshot.runFriendlyId, + snapshotFriendlyId: snapshot.snapshotFriendlyId, + }, + }); + + if (!result) { + this.logger.error("Failed to request snapshot", { + runId: snapshot.runFriendlyId, + runnerId: snapshot.runnerId, + }); + } + } + + #emitSnapshotSpan(runFriendlyId: string, durationMs?: number, snapshotId?: string) { + if (!this.tracing) return; + + const ctx = this.runTraceContexts.get(runFriendlyId); + if (!ctx) return; + + const parsed = parseTraceparent(ctx.traceparent); + if (!parsed) return; + + const endEpochMs = Date.now(); + const startEpochMs = durationMs ? endEpochMs - durationMs : endEpochMs; + + const spanAttributes: Record = { + "compute.type": "snapshot", + }; + + if (durationMs !== undefined) { + spanAttributes["compute.total_ms"] = durationMs; + } + + if (snapshotId) { + spanAttributes["compute.snapshot_id"] = snapshotId; + } + + this.tracing.emit({ + traceId: parsed.traceId, + parentSpanId: parsed.spanId, + spanName: "compute.snapshot", + startTimeMs: startEpochMs, + endTimeMs: endEpochMs, + resourceAttributes: { + "ctx.environment.id": ctx.envId, + "ctx.organization.id": ctx.orgId, + "ctx.project.id": ctx.projectId, + "ctx.run.id": runFriendlyId, + }, + spanAttributes, + }); + } +} diff --git a/apps/supervisor/src/services/failedPodHandler.test.ts b/apps/supervisor/src/services/failedPodHandler.test.ts index d05783288ed..4dbfda16f43 100644 --- a/apps/supervisor/src/services/failedPodHandler.test.ts +++ b/apps/supervisor/src/services/failedPodHandler.test.ts @@ -1,10 +1,11 @@ import { describe, it, expect, beforeAll, afterEach } from "vitest"; import { FailedPodHandler } from "./failedPodHandler.js"; -import { K8sApi, createK8sApi } from "../clients/kubernetes.js"; +import { type K8sApi, createK8sApi } from "../clients/kubernetes.js"; import { Registry } from "prom-client"; import { setTimeout } from "timers/promises"; -describe("FailedPodHandler Integration Tests", () => { +// These tests require live K8s cluster credentials - skip by default +describe.skipIf(!process.env.K8S_INTEGRATION_TESTS)("FailedPodHandler Integration Tests", () => { const k8s = createK8sApi(); const namespace = "integration-test"; const register = new Registry(); diff --git a/apps/supervisor/src/services/failedPodHandler.ts b/apps/supervisor/src/services/failedPodHandler.ts index 07217243769..3d56c92b213 100644 --- a/apps/supervisor/src/services/failedPodHandler.ts +++ b/apps/supervisor/src/services/failedPodHandler.ts @@ -151,7 +151,7 @@ export class FailedPodHandler { } private async onPodCompleted(pod: V1Pod) { - this.logger.info("pod-completed", this.podSummary(pod)); + this.logger.debug("pod-completed", this.podSummary(pod)); this.informerEventsTotal.inc({ namespace: this.namespace, verb: "add" }); if (!pod.metadata?.name) { @@ -165,7 +165,7 @@ export class FailedPodHandler { } if (pod.metadata?.deletionTimestamp) { - this.logger.info("pod-completed: pod is being deleted", this.podSummary(pod)); + this.logger.verbose("pod-completed: pod is being deleted", this.podSummary(pod)); return; } @@ -188,7 +188,7 @@ export class FailedPodHandler { } private async onPodSucceeded(pod: V1Pod) { - this.logger.info("pod-succeeded", this.podSummary(pod)); + this.logger.debug("pod-succeeded", this.podSummary(pod)); this.processedPodsTotal.inc({ namespace: this.namespace, status: this.podStatus(pod), @@ -196,7 +196,7 @@ export class FailedPodHandler { } private async onPodFailed(pod: V1Pod) { - this.logger.info("pod-failed", this.podSummary(pod)); + this.logger.debug("pod-failed", this.podSummary(pod)); try { await this.processFailedPod(pod); @@ -208,7 +208,7 @@ export class FailedPodHandler { } private async processFailedPod(pod: V1Pod) { - this.logger.info("pod-failed: processing pod", this.podSummary(pod)); + this.logger.verbose("pod-failed: processing pod", this.podSummary(pod)); const mainContainer = pod.status?.containerStatuses?.find((c) => c.name === "run-controller"); @@ -231,7 +231,7 @@ export class FailedPodHandler { } private async deletePod(pod: V1Pod) { - this.logger.info("pod-failed: deleting pod", this.podSummary(pod)); + this.logger.verbose("pod-failed: deleting pod", this.podSummary(pod)); try { await this.k8s.core.deleteNamespacedPod({ name: pod.metadata!.name!, diff --git a/apps/supervisor/src/services/otlpTraceService.test.ts b/apps/supervisor/src/services/otlpTraceService.test.ts new file mode 100644 index 00000000000..b9a5398cb11 --- /dev/null +++ b/apps/supervisor/src/services/otlpTraceService.test.ts @@ -0,0 +1,114 @@ +import { describe, it, expect } from "vitest"; +import { buildPayload } from "./otlpTraceService.js"; + +describe("buildPayload", () => { + it("builds valid OTLP JSON with timing attributes", () => { + const payload = buildPayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + parentSpanId: "1234567890abcdef", + spanName: "compute.provision", + startTimeMs: 1000, + endTimeMs: 1250, + resourceAttributes: { + "ctx.environment.id": "env_123", + "ctx.organization.id": "org_456", + "ctx.project.id": "proj_789", + "ctx.run.id": "run_abc", + }, + spanAttributes: { + "compute.total_ms": 250, + "compute.gateway.schedule_ms": 1, + "compute.cache.image_cached": true, + }, + }); + + expect(payload.resourceSpans).toHaveLength(1); + + const resourceSpan = payload.resourceSpans[0]!; + + // $trigger=true so the webapp accepts it + const triggerAttr = resourceSpan.resource.attributes.find((a) => a.key === "$trigger"); + expect(triggerAttr).toEqual({ key: "$trigger", value: { boolValue: true } }); + + // Resource attributes + const envAttr = resourceSpan.resource.attributes.find( + (a) => a.key === "ctx.environment.id" + ); + expect(envAttr).toEqual({ + key: "ctx.environment.id", + value: { stringValue: "env_123" }, + }); + + // Span basics + const span = resourceSpan.scopeSpans[0]!.spans[0]!; + expect(span.name).toBe("compute.provision"); + expect(span.traceId).toBe("abcd1234abcd1234abcd1234abcd1234"); + expect(span.parentSpanId).toBe("1234567890abcdef"); + + // Integer attribute + const totalMs = span.attributes.find((a) => a.key === "compute.total_ms"); + expect(totalMs).toEqual({ key: "compute.total_ms", value: { intValue: 250 } }); + + // Boolean attribute + const cached = span.attributes.find((a) => a.key === "compute.cache.image_cached"); + expect(cached).toEqual({ key: "compute.cache.image_cached", value: { boolValue: true } }); + }); + + it("generates a valid 16-char hex span ID", () => { + const payload = buildPayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1001, + resourceAttributes: {}, + spanAttributes: {}, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + expect(span.spanId).toMatch(/^[0-9a-f]{16}$/); + }); + + it("converts timestamps to nanoseconds", () => { + const payload = buildPayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1250, + resourceAttributes: {}, + spanAttributes: {}, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + expect(span.startTimeUnixNano).toBe("1000000000"); + expect(span.endTimeUnixNano).toBe("1250000000"); + }); + + it("omits parentSpanId when not provided", () => { + const payload = buildPayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1001, + resourceAttributes: {}, + spanAttributes: {}, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + expect(span.parentSpanId).toBeUndefined(); + }); + + it("handles double values for non-integer numbers", () => { + const payload = buildPayload({ + traceId: "abcd1234abcd1234abcd1234abcd1234", + spanName: "test", + startTimeMs: 1000, + endTimeMs: 1001, + resourceAttributes: {}, + spanAttributes: { "compute.cpu": 0.25 }, + }); + + const span = payload.resourceSpans[0]!.scopeSpans[0]!.spans[0]!; + const cpu = span.attributes.find((a) => a.key === "compute.cpu"); + expect(cpu).toEqual({ key: "compute.cpu", value: { doubleValue: 0.25 } }); + }); +}); diff --git a/apps/supervisor/src/services/otlpTraceService.ts b/apps/supervisor/src/services/otlpTraceService.ts new file mode 100644 index 00000000000..59eaefc8322 --- /dev/null +++ b/apps/supervisor/src/services/otlpTraceService.ts @@ -0,0 +1,93 @@ +import { randomBytes } from "crypto"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; + +export type OtlpTraceServiceOptions = { + endpointUrl: string; + timeoutMs?: number; +}; + +export type OtlpTraceSpan = { + traceId: string; + parentSpanId?: string; + spanName: string; + startTimeMs: number; + endTimeMs: number; + resourceAttributes: Record; + spanAttributes: Record; +}; + +export class OtlpTraceService { + private readonly logger = new SimpleStructuredLogger("otlp-trace"); + + constructor(private opts: OtlpTraceServiceOptions) {} + + /** Fire-and-forget: build payload and send to the configured OTLP endpoint */ + emit(span: OtlpTraceSpan): void { + const payload = buildPayload(span); + + fetch(`${this.opts.endpointUrl}/v1/traces`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(payload), + signal: AbortSignal.timeout(this.opts.timeoutMs ?? 5_000), + }).catch((err) => { + this.logger.warn("failed to send compute trace span", { + error: err instanceof Error ? err.message : String(err), + }); + }); + } +} + +// ── Payload builder (internal) ─────────────────────────────────────────────── + +/** @internal Exported for tests only */ +export function buildPayload(span: OtlpTraceSpan) { + const spanId = randomBytes(8).toString("hex"); + + return { + resourceSpans: [ + { + resource: { + attributes: [ + { key: "$trigger", value: { boolValue: true } }, + ...toOtlpAttributes(span.resourceAttributes), + ], + }, + scopeSpans: [ + { + scope: { name: "supervisor.compute" }, + spans: [ + { + traceId: span.traceId, + spanId, + parentSpanId: span.parentSpanId, + name: span.spanName, + kind: 3, // SPAN_KIND_CLIENT + startTimeUnixNano: String(span.startTimeMs * 1_000_000), + endTimeUnixNano: String(span.endTimeMs * 1_000_000), + attributes: toOtlpAttributes(span.spanAttributes), + status: { code: 1 }, // STATUS_CODE_OK + }, + ], + }, + ], + }, + ], + }; +} + +function toOtlpAttributes( + attrs: Record +): Array<{ key: string; value: Record }> { + return Object.entries(attrs).map(([key, value]) => ({ + key, + value: toOtlpValue(value), + })); +} + +function toOtlpValue(value: string | number | boolean): Record { + if (typeof value === "string") return { stringValue: value }; + if (typeof value === "boolean") return { boolValue: value }; + if (Number.isInteger(value)) return { intValue: value }; + return { doubleValue: value }; +} diff --git a/apps/supervisor/src/services/podCleaner.test.ts b/apps/supervisor/src/services/podCleaner.test.ts index 36bb5de6d1f..d6ed2bb737f 100644 --- a/apps/supervisor/src/services/podCleaner.test.ts +++ b/apps/supervisor/src/services/podCleaner.test.ts @@ -1,10 +1,11 @@ import { PodCleaner } from "./podCleaner.js"; -import { K8sApi, createK8sApi } from "../clients/kubernetes.js"; +import { type K8sApi, createK8sApi } from "../clients/kubernetes.js"; import { setTimeout } from "timers/promises"; import { describe, it, expect, beforeAll, afterEach } from "vitest"; import { Registry } from "prom-client"; -describe("PodCleaner Integration Tests", () => { +// These tests require live K8s cluster credentials - skip by default +describe.skipIf(!process.env.K8S_INTEGRATION_TESTS)("PodCleaner Integration Tests", () => { const k8s = createK8sApi(); const namespace = "integration-test"; const register = new Registry(); diff --git a/apps/supervisor/src/services/podCleaner.ts b/apps/supervisor/src/services/podCleaner.ts index 56eaaeb88af..3ac5da293df 100644 --- a/apps/supervisor/src/services/podCleaner.ts +++ b/apps/supervisor/src/services/podCleaner.ts @@ -90,7 +90,7 @@ export class PodCleaner { status: "succeeded", }); - this.logger.info("Deleted batch of pods", { continuationToken }); + this.logger.debug("Deleted batch of pods", { continuationToken }); } catch (err) { this.logger.error("Failed to delete batch of pods", { err: err instanceof Error ? err.message : String(err), diff --git a/apps/supervisor/src/services/timerWheel.test.ts b/apps/supervisor/src/services/timerWheel.test.ts new file mode 100644 index 00000000000..3f6bb9aa19b --- /dev/null +++ b/apps/supervisor/src/services/timerWheel.test.ts @@ -0,0 +1,254 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { TimerWheel } from "./timerWheel.js"; + +describe("TimerWheel", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("dispatches item after delay", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "snapshot-data"); + + // Not yet + vi.advanceTimersByTime(2900); + expect(dispatched).toEqual([]); + + // After delay + vi.advanceTimersByTime(200); + expect(dispatched).toEqual(["run-1"]); + + wheel.stop(); + }); + + it("cancels item before it fires", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + vi.advanceTimersByTime(1000); + expect(wheel.cancel("run-1")).toBe(true); + + vi.advanceTimersByTime(5000); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("cancel returns false for unknown key", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + expect(wheel.cancel("nonexistent")).toBe(false); + }); + + it("deduplicates: resubmitting same key replaces the entry", () => { + const dispatched: { key: string; data: string }[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push({ key: item.key, data: item.data }), + }); + + wheel.start(); + wheel.submit("run-1", "old-data"); + + vi.advanceTimersByTime(1000); + wheel.submit("run-1", "new-data"); + + // Original would have fired at t=3000, but was replaced + // New one fires at t=1000+3000=4000 + vi.advanceTimersByTime(2100); + expect(dispatched).toEqual([]); + + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual([{ key: "run-1", data: "new-data" }]); + + wheel.stop(); + }); + + it("handles many concurrent items", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + for (let i = 0; i < 1000; i++) { + wheel.submit(`run-${i}`, `data-${i}`); + } + expect(wheel.size).toBe(1000); + + vi.advanceTimersByTime(3100); + expect(dispatched.length).toBe(1000); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("handles items submitted at different times", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + wheel.submit("run-1", "data"); + vi.advanceTimersByTime(1000); + wheel.submit("run-2", "data"); + vi.advanceTimersByTime(1000); + wheel.submit("run-3", "data"); + + // t=2000: nothing yet + expect(dispatched).toEqual([]); + + // t=3100: run-1 fires + vi.advanceTimersByTime(1100); + expect(dispatched).toEqual(["run-1"]); + + // t=4100: run-2 fires + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual(["run-1", "run-2"]); + + // t=5100: run-3 fires + vi.advanceTimersByTime(1000); + expect(dispatched).toEqual(["run-1", "run-2", "run-3"]); + + wheel.stop(); + }); + + it("setDelay changes delay for new items only", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + + wheel.submit("run-1", "data"); // 3s delay + + vi.advanceTimersByTime(500); + wheel.setDelay(1000); + wheel.submit("run-2", "data"); // 1s delay + + // t=1500: run-2 should have fired (submitted at t=500 with 1s delay) + vi.advanceTimersByTime(1100); + expect(dispatched).toEqual(["run-2"]); + + // t=3100: run-1 fires at its original 3s delay + vi.advanceTimersByTime(1500); + expect(dispatched).toEqual(["run-2", "run-1"]); + + wheel.stop(); + }); + + it("stop returns unprocessed items", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data-1"); + wheel.submit("run-2", "data-2"); + wheel.submit("run-3", "data-3"); + + const remaining = wheel.stop(); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + expect(remaining.length).toBe(3); + expect(remaining.map((r) => r.key).sort()).toEqual(["run-1", "run-2", "run-3"]); + expect(remaining.find((r) => r.key === "run-1")?.data).toBe("data-1"); + }); + + it("after stop, new submissions are silently dropped", () => { + const dispatched: string[] = []; + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.stop(); + + wheel.submit("run-late", "data"); + expect(dispatched).toEqual([]); + expect(wheel.size).toBe(0); + }); + + it("tracks size correctly through submit/cancel/dispatch", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + + wheel.start(); + + wheel.submit("a", "data"); + wheel.submit("b", "data"); + expect(wheel.size).toBe(2); + + wheel.cancel("a"); + expect(wheel.size).toBe(1); + + vi.advanceTimersByTime(3100); + expect(wheel.size).toBe(0); + + wheel.stop(); + }); + + it("clamps delay to valid range", () => { + const dispatched: string[] = []; + + // Very small delay (should be at least 1 tick = 100ms) + const wheel = new TimerWheel({ + delayMs: 0, + onExpire: (item) => dispatched.push(item.key), + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + vi.advanceTimersByTime(200); + expect(dispatched).toEqual(["run-1"]); + + wheel.stop(); + }); + + it("multiple cancel calls are safe", () => { + const wheel = new TimerWheel({ + delayMs: 3000, + onExpire: () => {}, + }); + + wheel.start(); + wheel.submit("run-1", "data"); + + expect(wheel.cancel("run-1")).toBe(true); + expect(wheel.cancel("run-1")).toBe(false); + + wheel.stop(); + }); +}); diff --git a/apps/supervisor/src/services/timerWheel.ts b/apps/supervisor/src/services/timerWheel.ts new file mode 100644 index 00000000000..9584423824d --- /dev/null +++ b/apps/supervisor/src/services/timerWheel.ts @@ -0,0 +1,160 @@ +/** + * TimerWheel implements a hashed timer wheel for efficiently managing large numbers + * of delayed operations with O(1) submit, cancel, and per-item dispatch. + * + * Used by the supervisor to delay snapshot requests so that short-lived waitpoints + * (e.g. triggerAndWait that resolves in <3s) skip the snapshot entirely. + * + * The wheel is a ring buffer of slots. A single setInterval advances a cursor. + * When the cursor reaches a slot, all items in that slot are dispatched. + * + * Fixed capacity: 600 slots at 100ms tick = 60s max delay. + */ + +const TICK_MS = 100; +const NUM_SLOTS = 600; // 60s max delay at 100ms tick + +export type TimerWheelItem = { + key: string; + data: T; +}; + +export type TimerWheelOptions = { + /** Called when an item's delay expires. */ + onExpire: (item: TimerWheelItem) => void; + /** Delay in milliseconds before items fire. Clamped to [100, 60000]. */ + delayMs: number; +}; + +type Entry = { + key: string; + data: T; + slotIndex: number; +}; + +export class TimerWheel { + private slots: Set[]; + private entries: Map>; + private cursor: number; + private intervalId: ReturnType | null; + private onExpire: (item: TimerWheelItem) => void; + private delaySlots: number; + + constructor(opts: TimerWheelOptions) { + this.slots = Array.from({ length: NUM_SLOTS }, () => new Set()); + this.entries = new Map(); + this.cursor = 0; + this.intervalId = null; + this.onExpire = opts.onExpire; + this.delaySlots = Math.max(1, Math.min(NUM_SLOTS, Math.ceil(opts.delayMs / TICK_MS))); + } + + /** Start the timer wheel. Must be called before submitting items. */ + start(): void { + if (this.intervalId) return; + this.intervalId = setInterval(() => this.tick(), TICK_MS); + // Don't hold the process open just for the timer wheel + if (this.intervalId && typeof this.intervalId === "object" && "unref" in this.intervalId) { + this.intervalId.unref(); + } + } + + /** + * Stop the timer wheel and return all unprocessed items. + * The wheel keeps running normally during graceful shutdown - call stop() + * only when you're ready to tear down. Caller decides what to do with leftovers. + */ + stop(): TimerWheelItem[] { + if (this.intervalId) { + clearInterval(this.intervalId); + this.intervalId = null; + } + + const remaining: TimerWheelItem[] = []; + for (const [key, entry] of this.entries) { + remaining.push({ key, data: entry.data }); + } + + for (const slot of this.slots) { + slot.clear(); + } + this.entries.clear(); + + return remaining; + } + + /** + * Update the delay for future submissions. Already-queued items keep their original timing. + * Clamped to [TICK_MS, 60000ms]. + */ + setDelay(delayMs: number): void { + this.delaySlots = Math.max(1, Math.min(NUM_SLOTS, Math.ceil(delayMs / TICK_MS))); + } + + /** + * Submit an item to be dispatched after the configured delay. + * If an item with the same key already exists, it is replaced (dedup). + * No-op if the wheel is stopped. + */ + submit(key: string, data: T): void { + if (!this.intervalId) return; + + // Dedup: remove existing entry for this key + this.cancel(key); + + const slotIndex = (this.cursor + this.delaySlots) % NUM_SLOTS; + const entry: Entry = { key, data, slotIndex }; + + this.entries.set(key, entry); + this.slot(slotIndex).add(key); + } + + /** + * Cancel a pending item. Returns true if the item was found and removed. + */ + cancel(key: string): boolean { + const entry = this.entries.get(key); + if (!entry) return false; + + this.slot(entry.slotIndex).delete(key); + this.entries.delete(key); + return true; + } + + /** Number of pending items in the wheel. */ + get size(): number { + return this.entries.size; + } + + /** Whether the wheel is running. */ + get running(): boolean { + return this.intervalId !== null; + } + + /** Get a slot by index. The array is fully initialized so this always returns a Set. */ + private slot(index: number): Set { + const s = this.slots[index]; + if (!s) throw new Error(`TimerWheel: invalid slot index ${index}`); + return s; + } + + /** Advance the cursor and dispatch all items in the current slot. */ + private tick(): void { + this.cursor = (this.cursor + 1) % NUM_SLOTS; + const slot = this.slot(this.cursor); + + if (slot.size === 0) return; + + // Collect items to dispatch (copy keys since we mutate during iteration) + const keys = [...slot]; + slot.clear(); + + for (const key of keys) { + const entry = this.entries.get(key); + if (!entry) continue; + + this.entries.delete(key); + this.onExpire({ key, data: entry.data }); + } + } +} diff --git a/apps/supervisor/src/util.ts b/apps/supervisor/src/util.ts index 4fcda27b2aa..d14dd99bfe1 100644 --- a/apps/supervisor/src/util.ts +++ b/apps/supervisor/src/util.ts @@ -14,6 +14,18 @@ export function getDockerHostDomain() { return isMacOS || isWindows ? "host.docker.internal" : "localhost"; } +/** Extract the W3C traceparent string from an untyped trace context record */ +export function extractTraceparent(traceContext?: Record): string | undefined { + if ( + traceContext && + "traceparent" in traceContext && + typeof traceContext.traceparent === "string" + ) { + return traceContext.traceparent; + } + return undefined; +} + export function getRunnerId(runId: string, attemptNumber?: number) { const parts = ["runner", runId.replace("run_", "")]; @@ -23,3 +35,10 @@ export function getRunnerId(runId: string, attemptNumber?: number) { return parts.join("-"); } + +/** Derive a unique runnerId for a restore cycle using the checkpoint suffix */ +export function getRestoreRunnerId(runFriendlyId: string, checkpointId: string) { + const runIdShort = runFriendlyId.replace("run_", ""); + const checkpointSuffix = checkpointId.slice(-8); + return `runner-${runIdShort}-${checkpointSuffix}`; +} diff --git a/apps/supervisor/src/workloadManager/compute.ts b/apps/supervisor/src/workloadManager/compute.ts new file mode 100644 index 00000000000..1c00f33aad3 --- /dev/null +++ b/apps/supervisor/src/workloadManager/compute.ts @@ -0,0 +1,374 @@ +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { parseTraceparent } from "@trigger.dev/core/v3/isomorphic"; +import { flattenAttributes } from "@trigger.dev/core/v3/utils/flattenAttributes"; +import { + type WorkloadManager, + type WorkloadManagerCreateOptions, + type WorkloadManagerOptions, +} from "./types.js"; +import { ComputeClient, stripImageDigest } from "@internal/compute"; +import { extractTraceparent, getRunnerId } from "../util.js"; +import type { OtlpTraceService } from "../services/otlpTraceService.js"; +import { tryCatch } from "@trigger.dev/core"; + +type ComputeWorkloadManagerOptions = WorkloadManagerOptions & { + gateway: { + url: string; + authToken?: string; + timeoutMs: number; + }; + snapshots: { + enabled: boolean; + delayMs: number; + dispatchLimit: number; + callbackUrl: string; + }; + tracing?: OtlpTraceService; + runner: { + instanceName: string; + otelEndpoint: string; + prettyLogs: boolean; + }; +}; + +export class ComputeWorkloadManager implements WorkloadManager { + private readonly logger = new SimpleStructuredLogger("compute-workload-manager"); + private readonly compute: ComputeClient; + + constructor(private opts: ComputeWorkloadManagerOptions) { + if (opts.workloadApiDomain) { + this.logger.warn("⚠️ Custom workload API domain", { + domain: opts.workloadApiDomain, + }); + } + + this.compute = new ComputeClient({ + gatewayUrl: opts.gateway.url, + authToken: opts.gateway.authToken, + timeoutMs: opts.gateway.timeoutMs, + }); + } + + get snapshotsEnabled(): boolean { + return this.opts.snapshots.enabled; + } + + get snapshotDelayMs(): number { + return this.opts.snapshots.delayMs; + } + + get snapshotDispatchLimit(): number { + return this.opts.snapshots.dispatchLimit; + } + + get traceSpansEnabled(): boolean { + return !!this.opts.tracing; + } + + async create(opts: WorkloadManagerCreateOptions) { + const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); + + const envVars: Record = { + OTEL_EXPORTER_OTLP_ENDPOINT: this.opts.runner.otelEndpoint, + TRIGGER_DEQUEUED_AT_MS: String(opts.dequeuedAt.getTime()), + TRIGGER_POD_SCHEDULED_AT_MS: String(Date.now()), + TRIGGER_ENV_ID: opts.envId, + TRIGGER_DEPLOYMENT_ID: opts.deploymentFriendlyId, + TRIGGER_DEPLOYMENT_VERSION: opts.deploymentVersion, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: this.opts.runner.instanceName, + TRIGGER_RUNNER_ID: runnerId, + TRIGGER_MACHINE_CPU: String(opts.machine.cpu), + TRIGGER_MACHINE_MEMORY: String(opts.machine.memory), + PRETTY_LOGS: String(this.opts.runner.prettyLogs), + }; + + if (this.opts.warmStartUrl) { + envVars.TRIGGER_WARM_START_URL = this.opts.warmStartUrl; + } + + if (this.snapshotsEnabled && this.opts.metadataUrl) { + envVars.TRIGGER_METADATA_URL = this.opts.metadataUrl; + } + + if (this.opts.heartbeatIntervalSeconds) { + envVars.TRIGGER_HEARTBEAT_INTERVAL_SECONDS = String(this.opts.heartbeatIntervalSeconds); + } + + if (this.opts.snapshotPollIntervalSeconds) { + envVars.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS = String( + this.opts.snapshotPollIntervalSeconds + ); + } + + if (this.opts.additionalEnvVars) { + Object.assign(envVars, this.opts.additionalEnvVars); + } + + // Strip image digest - resolve by tag, not digest + const imageRef = stripImageDigest(opts.image); + + // Wide event: single canonical log line emitted in finally + const event: Record = { + // High-cardinality identifiers + runId: opts.runFriendlyId, + runnerId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + // Environment + instanceName: this.opts.runner.instanceName, + // Supervisor timing + dequeueResponseMs: opts.dequeueResponseMs, + pollingIntervalMs: opts.pollingIntervalMs, + warmStartCheckMs: opts.warmStartCheckMs, + // Request + image: imageRef, + }; + + const startMs = performance.now(); + + try { + const [error, data] = await tryCatch( + this.compute.instances.create({ + name: runnerId, + image: imageRef, + env: envVars, + cpu: opts.machine.cpu, + memory_gb: opts.machine.memory, + metadata: { + runId: opts.runFriendlyId, + envId: opts.envId, + envType: opts.envType, + orgId: opts.orgId, + projectId: opts.projectId, + deploymentVersion: opts.deploymentVersion, + machine: opts.machine.name, + }, + }) + ); + + if (error) { + event.error = error instanceof Error ? error.message : String(error); + event.errorType = + error instanceof DOMException && error.name === "TimeoutError" ? "timeout" : "fetch"; + // Intentional: errors are captured in the wide event, not thrown. This matches + // the Docker/K8s managers. The run will eventually time out if scheduling fails. + return; + } + + event.instanceId = data.id; + event.ok = true; + + // Parse timing data from compute response (optional - requires gateway timing flag) + if (data._timing) { + event.timing = data._timing; + } + + this.#emitProvisionSpan(opts, startMs, data._timing); + } finally { + event.durationMs = Math.round(performance.now() - startMs); + event.ok ??= false; + this.logger.debug("create instance", event); + } + } + + async snapshot(opts: { runnerId: string; metadata: Record }): Promise { + const [error] = await tryCatch( + this.compute.instances.snapshot(opts.runnerId, { + callback: { + url: this.opts.snapshots.callbackUrl, + metadata: opts.metadata, + }, + }) + ); + + if (error) { + this.logger.error("snapshot request failed", { + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + this.logger.debug("snapshot request accepted", { runnerId: opts.runnerId }); + return true; + } + + async deleteInstance(runnerId: string): Promise { + const [error] = await tryCatch(this.compute.instances.delete(runnerId)); + + if (error) { + this.logger.error("delete instance failed", { + runnerId, + error: error instanceof Error ? error.message : String(error), + }); + return false; + } + + this.logger.debug("delete instance success", { runnerId }); + return true; + } + + #emitProvisionSpan(opts: WorkloadManagerCreateOptions, startMs: number, timing?: unknown) { + if (!this.traceSpansEnabled) return; + + const parsed = parseTraceparent(extractTraceparent(opts.traceContext)); + if (!parsed) return; + + const endMs = performance.now(); + const now = Date.now(); + const provisionStartEpochMs = now - (endMs - startMs); + const endEpochMs = now; + + // Span starts at dequeue time so events (dequeue) render in the thin-line section + // before "Started". The actual provision call time is in provisionStartEpochMs. + // Subtract 1ms so compute span always sorts before the attempt span (same dequeue time) + const startEpochMs = opts.dequeuedAt.getTime() - 1; + + const spanAttributes: Record = { + "compute.type": "create", + "compute.provision_start_ms": provisionStartEpochMs, + ...(timing + ? (flattenAttributes(timing, "compute") as Record) + : {}), + }; + + if (opts.dequeueResponseMs !== undefined) { + spanAttributes["supervisor.dequeue_response_ms"] = opts.dequeueResponseMs; + } + if (opts.warmStartCheckMs !== undefined) { + spanAttributes["supervisor.warm_start_check_ms"] = opts.warmStartCheckMs; + } + + // Use the platform API URL, not the runner OTLP endpoint (which may be a VM gateway IP) + this.opts.tracing?.emit({ + traceId: parsed.traceId, + parentSpanId: parsed.spanId, + spanName: "compute.provision", + startTimeMs: startEpochMs, + endTimeMs: endEpochMs, + resourceAttributes: { + "ctx.environment.id": opts.envId, + "ctx.organization.id": opts.orgId, + "ctx.project.id": opts.projectId, + "ctx.run.id": opts.runFriendlyId, + }, + spanAttributes, + }); + } + + async restore(opts: { + snapshotId: string; + runnerId: string; + runFriendlyId: string; + snapshotFriendlyId: string; + machine: { cpu: number; memory: number }; + // Trace context for OTel span emission + traceContext?: Record; + envId?: string; + orgId?: string; + projectId?: string; + dequeuedAt?: Date; + }): Promise { + const metadata: Record = { + TRIGGER_RUNNER_ID: opts.runnerId, + TRIGGER_RUN_ID: opts.runFriendlyId, + TRIGGER_SNAPSHOT_ID: opts.snapshotFriendlyId, + TRIGGER_SUPERVISOR_API_PROTOCOL: this.opts.workloadApiProtocol, + TRIGGER_SUPERVISOR_API_PORT: String(this.opts.workloadApiPort), + TRIGGER_SUPERVISOR_API_DOMAIN: this.opts.workloadApiDomain ?? "", + TRIGGER_WORKER_INSTANCE_NAME: this.opts.runner.instanceName, + }; + + this.logger.verbose("restore request body", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + }); + + const startMs = performance.now(); + + const [error] = await tryCatch( + this.compute.snapshots.restore(opts.snapshotId, { + name: opts.runnerId, + metadata, + cpu: opts.machine.cpu, + memory_gb: opts.machine.memory, + }) + ); + + const durationMs = Math.round(performance.now() - startMs); + + if (error) { + this.logger.error("restore request failed", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + error: error instanceof Error ? error.message : String(error), + durationMs, + }); + return false; + } + + this.logger.debug("restore request success", { + snapshotId: opts.snapshotId, + runnerId: opts.runnerId, + durationMs, + }); + + this.#emitRestoreSpan(opts, startMs); + + return true; + } + + #emitRestoreSpan( + opts: { + snapshotId: string; + runnerId: string; + runFriendlyId: string; + traceContext?: Record; + envId?: string; + orgId?: string; + projectId?: string; + dequeuedAt?: Date; + }, + startMs: number + ) { + if (!this.traceSpansEnabled) return; + + const parsed = parseTraceparent(extractTraceparent(opts.traceContext)); + if (!parsed || !opts.envId || !opts.orgId || !opts.projectId) return; + + const endMs = performance.now(); + const now = Date.now(); + const restoreStartEpochMs = now - (endMs - startMs); + const endEpochMs = now; + + // Subtract 1ms so restore span always sorts before the attempt span + const startEpochMs = (opts.dequeuedAt?.getTime() ?? restoreStartEpochMs) - 1; + + this.opts.tracing?.emit({ + traceId: parsed.traceId, + parentSpanId: parsed.spanId, + spanName: "compute.restore", + startTimeMs: startEpochMs, + endTimeMs: endEpochMs, + resourceAttributes: { + "ctx.environment.id": opts.envId, + "ctx.organization.id": opts.orgId, + "ctx.project.id": opts.projectId, + "ctx.run.id": opts.runFriendlyId, + }, + spanAttributes: { + "compute.type": "restore", + "compute.snapshot_id": opts.snapshotId, + }, + }); + } +} diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index d6651d325a2..66405df9ba5 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -62,7 +62,7 @@ export class DockerWorkloadManager implements WorkloadManager { } async create(opts: WorkloadManagerCreateOptions) { - this.logger.log("create()", { opts }); + this.logger.verbose("create()", { opts }); const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index b2e737383ab..a7402e9a349 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -100,7 +100,7 @@ export class KubernetesWorkloadManager implements WorkloadManager { } async create(opts: WorkloadManagerCreateOptions) { - this.logger.log("[KubernetesWorkloadManager] Creating container", { opts }); + this.logger.verbose("[KubernetesWorkloadManager] Creating container", { opts }); const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index 0ab47f0fffa..86199afe469 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -24,6 +24,10 @@ export interface WorkloadManagerCreateOptions { nextAttemptNumber?: number; dequeuedAt: Date; placementTags?: PlacementTag[]; + // Timing context (populated by supervisor handler, included in wide event) + dequeueResponseMs?: number; + pollingIntervalMs?: number; + warmStartCheckMs?: number; // identifiers envId: string; envType: EnvironmentType; @@ -35,6 +39,8 @@ export interface WorkloadManagerCreateOptions { runFriendlyId: string; snapshotId: string; snapshotFriendlyId: string; + // Trace context for OTel span emission (W3C format: { traceparent: "00-...", tracestate?: "..." }) + traceContext?: Record; annotations?: RunAnnotations; // private networking hasPrivateLink?: boolean; diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 35d53d36099..bd38cc8700f 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -24,6 +24,13 @@ import { HttpServer, type CheckpointClient } from "@trigger.dev/core/v3/serverOn import { type IncomingMessage } from "node:http"; import { register } from "../metrics.js"; import { env } from "../env.js"; +import { SnapshotCallbackPayloadSchema } from "@internal/compute"; +import { + ComputeSnapshotService, + type RunTraceContext, +} from "../services/computeSnapshotService.js"; +import type { ComputeWorkloadManager } from "../workloadManager/compute.js"; +import type { OtlpTraceService } from "../services/otlpTraceService.js"; // Use the official export when upgrading to socket.io@4.8.0 interface DefaultEventsMap { @@ -58,10 +65,13 @@ type WorkloadServerOptions = { host?: string; workerClient: SupervisorHttpClient; checkpointClient?: CheckpointClient; + computeManager?: ComputeWorkloadManager; + tracing?: OtlpTraceService; }; export class WorkloadServer extends EventEmitter { private checkpointClient?: CheckpointClient; + private readonly snapshotService?: ComputeSnapshotService; private readonly logger = new SimpleStructuredLogger("workload-server"); @@ -94,6 +104,14 @@ export class WorkloadServer extends EventEmitter { this.workerClient = opts.workerClient; this.checkpointClient = opts.checkpointClient; + if (opts.computeManager?.snapshotsEnabled) { + this.snapshotService = new ComputeSnapshotService({ + computeManager: opts.computeManager, + workerClient: opts.workerClient, + tracing: opts.tracing, + }); + } + this.httpServer = this.createHttpServer({ host, port }); this.websocketServer = this.createWebsocketServer(); } @@ -229,13 +247,28 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, handler: async ({ reply, params, req }) => { - this.logger.debug("Suspend request", { params, headers: req.headers }); + const runnerId = this.runnerIdFromRequest(req); + const deploymentVersion = this.deploymentVersionFromRequest(req); + const projectRef = this.projectRefFromRequest(req); - if (!this.checkpointClient) { + this.logger.debug("Suspend request", { + params, + runnerId, + deploymentVersion, + projectRef, + }); + + if (!runnerId || !deploymentVersion || !projectRef) { + this.logger.error("Invalid headers for suspend request", { + ...params, + runnerId, + deploymentVersion, + projectRef, + }); reply.json( { ok: false, - error: "Checkpoints disabled", + error: "Invalid headers", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -243,19 +276,25 @@ export class WorkloadServer extends EventEmitter { return; } - const runnerId = this.runnerIdFromRequest(req); - const deploymentVersion = this.deploymentVersionFromRequest(req); - const projectRef = this.projectRefFromRequest(req); + if (this.snapshotService) { + // Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints. + // If the run continues before the delay expires, the snapshot is cancelled. + reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202); - if (!runnerId || !deploymentVersion || !projectRef) { - this.logger.error("Invalid headers for suspend request", { - ...params, - headers: req.headers, + this.snapshotService.schedule(params.runFriendlyId, { + runnerId, + runFriendlyId: params.runFriendlyId, + snapshotFriendlyId: params.snapshotFriendlyId, }); + + return; + } + + if (!this.checkpointClient) { reply.json( { ok: false, - error: "Invalid headers", + error: "Checkpoints disabled", } satisfies WorkloadSuspendRunResponseBody, false, 400 @@ -298,6 +337,9 @@ export class WorkloadServer extends EventEmitter { handler: async ({ req, reply, params }) => { this.logger.debug("Run continuation request", { params }); + // Cancel any pending delayed snapshot for this run + this.snapshotService?.cancel(params.runFriendlyId); + const continuationResult = await this.workerClient.continueRunExecution( params.runFriendlyId, params.snapshotFriendlyId, @@ -394,6 +436,20 @@ export class WorkloadServer extends EventEmitter { }); } + // Compute snapshot callback endpoint + httpServer.route("/api/v1/compute/snapshot-complete", "POST", { + bodySchema: SnapshotCallbackPayloadSchema, + handler: async ({ reply, body }) => { + if (!this.snapshotService) { + reply.empty(404); + return; + } + + const result = await this.snapshotService.handleCallback(body); + reply.empty(result.status); + }, + }); + return httpServer; } @@ -408,7 +464,7 @@ export class WorkloadServer extends EventEmitter { > = io.of("/workload"); websocketServer.on("disconnect", (socket) => { - this.logger.log("[WS] disconnect", socket.id); + this.logger.verbose("[WS] disconnect", socket.id); }); websocketServer.use(async (socket, next) => { const setSocketDataFromHeader = ( @@ -490,7 +546,7 @@ export class WorkloadServer extends EventEmitter { socket.data.runFriendlyId = undefined; }; - socketLogger.log("wsServer socket connected", { ...getSocketMetadata() }); + socketLogger.debug("wsServer socket connected", { ...getSocketMetadata() }); // FIXME: where does this get set? if (socket.data.runFriendlyId) { @@ -498,7 +554,11 @@ export class WorkloadServer extends EventEmitter { } socket.on("disconnecting", (reason, description) => { - socketLogger.log("Socket disconnecting", { ...getSocketMetadata(), reason, description }); + socketLogger.verbose("Socket disconnecting", { + ...getSocketMetadata(), + reason, + description, + }); if (socket.data.runFriendlyId) { runDisconnected(socket.data.runFriendlyId); @@ -506,7 +566,7 @@ export class WorkloadServer extends EventEmitter { }); socket.on("disconnect", (reason, description) => { - socketLogger.log("Socket disconnected", { ...getSocketMetadata(), reason, description }); + socketLogger.debug("Socket disconnected", { ...getSocketMetadata(), reason, description }); }); socket.on("error", (error) => { @@ -527,7 +587,7 @@ export class WorkloadServer extends EventEmitter { ...message, }); - log.log("Handling run:start"); + log.debug("Handling run:start"); try { runConnected(message.run.friendlyId); @@ -543,10 +603,13 @@ export class WorkloadServer extends EventEmitter { ...message, }); - log.log("Handling run:stop"); + log.debug("Handling run:stop"); try { runDisconnected(message.run.friendlyId); + // Don't delete trace context here - run:stop fires after each snapshot/shutdown + // but the run may be restored on a new VM and snapshot again. Trace context is + // re-populated on dequeue, and entries are small (4 strings per run). } catch (error) { log.error("run:stop error", { error }); } @@ -588,11 +651,16 @@ export class WorkloadServer extends EventEmitter { } } + registerRunTraceContext(runFriendlyId: string, ctx: RunTraceContext) { + this.snapshotService?.registerTraceContext(runFriendlyId, ctx); + } + async start() { await this.httpServer.start(); } async stop() { + this.snapshotService?.stop(); await this.httpServer.stop(); } } diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 912887fda1f..a29fe4b9796 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -333,6 +333,11 @@ const EnvironmentSchema = z .optional() .transform((v) => v ?? process.env.DEPLOY_REGISTRY_ECR_ASSUME_ROLE_EXTERNAL_ID), + // Compute gateway (template creation during deploy finalize) + COMPUTE_GATEWAY_URL: z.string().optional(), + COMPUTE_GATEWAY_AUTH_TOKEN: z.string().optional(), + COMPUTE_TEMPLATE_SHADOW_ROLLOUT_PCT: z.string().optional(), + DEPLOY_IMAGE_PLATFORM: z.string().default("linux/amd64"), DEPLOY_TIMEOUT_MS: z.coerce .number() diff --git a/apps/webapp/app/v3/featureFlags.server.ts b/apps/webapp/app/v3/featureFlags.server.ts index f32f34c64b8..8a2e879395d 100644 --- a/apps/webapp/app/v3/featureFlags.server.ts +++ b/apps/webapp/app/v3/featureFlags.server.ts @@ -9,6 +9,7 @@ export const FEATURE_FLAG = { hasLogsPageAccess: "hasLogsPageAccess", hasAiAccess: "hasAiAccess", hasAiModelsAccess: "hasAiModelsAccess", + hasComputeAccess: "hasComputeAccess", } as const; const FeatureFlagCatalog = { @@ -19,6 +20,7 @@ const FeatureFlagCatalog = { [FEATURE_FLAG.hasLogsPageAccess]: z.coerce.boolean(), [FEATURE_FLAG.hasAiAccess]: z.coerce.boolean(), [FEATURE_FLAG.hasAiModelsAccess]: z.coerce.boolean(), + [FEATURE_FLAG.hasComputeAccess]: z.coerce.boolean(), }; type FeatureFlagKey = keyof typeof FeatureFlagCatalog; diff --git a/apps/webapp/app/v3/services/computeTemplateCreation.server.ts b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts new file mode 100644 index 00000000000..6c144b13448 --- /dev/null +++ b/apps/webapp/app/v3/services/computeTemplateCreation.server.ts @@ -0,0 +1,185 @@ +import { ComputeClient, stripImageDigest } from "@internal/compute"; +import { machinePresetFromName } from "~/v3/machinePresets.server"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import type { PrismaClientOrTransaction } from "~/db.server"; +import { FEATURE_FLAG, makeFlag } from "~/v3/featureFlags.server"; +import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { ServiceValidationError } from "./baseService.server"; +import { FailDeploymentService } from "./failDeployment.server"; + +type TemplateCreationMode = "required" | "shadow" | "skip"; + +export class ComputeTemplateCreationService { + private client: ComputeClient | undefined; + + constructor() { + if (env.COMPUTE_GATEWAY_URL) { + this.client = new ComputeClient({ + gatewayUrl: env.COMPUTE_GATEWAY_URL, + authToken: env.COMPUTE_GATEWAY_AUTH_TOKEN, + timeoutMs: 5 * 60 * 1000, // 5 minutes + }); + } + } + + /** + * Handle template creation for a deployment. Call this before setting DEPLOYED. + * + * - Required mode: creates template synchronously, fails deployment on error + * - Shadow mode: fires background template creation (returns immediately) + * - Skip: no-op + * + * Throws ServiceValidationError if required mode fails (caller should stop finalize). + */ + async handleDeployTemplate(options: { + projectId: string; + imageReference: string; + deploymentFriendlyId: string; + authenticatedEnv: AuthenticatedEnvironment; + prisma: PrismaClientOrTransaction; + writer?: WritableStreamDefaultWriter; + }): Promise { + const mode = await this.resolveMode(options.projectId, options.prisma); + + if (mode === "skip") { + return; + } + + if (mode === "shadow") { + this.createTemplate(options.imageReference, { background: true }) + .then((result) => { + if (!result.success) { + logger.error("Shadow template creation failed", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + error: result.error, + }); + } + }) + .catch((error) => { + logger.error("Shadow template creation threw unexpectedly", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + error: error instanceof Error ? error.message : String(error), + }); + }); + return; + } + + // Required mode + if (options.writer) { + try { + await options.writer.write( + `event: log\ndata: ${JSON.stringify({ message: "Building compute template..." })}\n\n` + ); + } catch { + // Stream may be closed if client disconnected - continue with template creation + } + } + + logger.info("Creating compute template (required mode)", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + }); + + const result = await this.createTemplate(options.imageReference); + + if (!result.success) { + logger.error("Compute template creation failed", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + error: result.error, + }); + + const failService = new FailDeploymentService(); + await failService.call(options.authenticatedEnv, options.deploymentFriendlyId, { + error: { + name: "TemplateCreationFailed", + message: `Failed to create compute template: ${result.error}`, + }, + }); + + throw new ServiceValidationError( + `Compute template creation failed: ${result.error}` + ); + } + + logger.info("Compute template created", { + id: options.deploymentFriendlyId, + imageReference: options.imageReference, + }); + } + + async resolveMode( + projectId: string, + prisma: PrismaClientOrTransaction + ): Promise { + if (!this.client) { + return "skip"; + } + + const project = await prisma.project.findFirst({ + where: { id: projectId }, + select: { + defaultWorkerGroup: { + select: { workloadType: true }, + }, + organization: { + select: { featureFlags: true }, + }, + }, + }); + + if (project?.defaultWorkerGroup?.workloadType === "MICROVM") { + return "required"; + } + + const flag = makeFlag(prisma); + const hasComputeAccess = await flag({ + key: FEATURE_FLAG.hasComputeAccess, + defaultValue: false, + overrides: (project?.organization?.featureFlags as Record) ?? {}, + }); + + if (hasComputeAccess) { + return "shadow"; + } + + const rolloutPct = Number(env.COMPUTE_TEMPLATE_SHADOW_ROLLOUT_PCT ?? "0"); + if (rolloutPct > 0 && Math.random() * 100 < rolloutPct) { + return "shadow"; + } + + return "skip"; + } + + async createTemplate( + imageReference: string, + options?: { background?: boolean } + ): Promise<{ success: boolean; error?: string }> { + if (!this.client) { + return { success: false, error: "Compute gateway not configured" }; + } + + try { + // Templates are resource-agnostic - these values don't affect template content. + const machine = machinePresetFromName("small-1x"); + + await this.client.templates.create({ + image: stripImageDigest(imageReference), + cpu: machine.cpu, + memory_gb: machine.memory, + background: options?.background, + }); + return { success: true }; + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown error"; + logger.error("Failed to create compute template", { + imageReference, + error: message, + }); + return { success: false, error: message }; + } + } +} diff --git a/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts b/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts index 2ad2b7b8258..7ca8a379a3d 100644 --- a/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts +++ b/apps/webapp/app/v3/services/finalizeDeploymentV2.server.ts @@ -15,6 +15,7 @@ import { remoteBuildsEnabled } from "../remoteImageBuilder.server"; import { getEcrAuthToken, isEcrRegistry } from "../getDeploymentImageRef.server"; import { tryCatch } from "@trigger.dev/core"; import { getRegistryConfig, type RegistryConfig } from "../registryConfig.server"; +import { ComputeTemplateCreationService } from "./computeTemplateCreation.server"; export class FinalizeDeploymentV2Service extends BaseService { public async call( @@ -23,12 +24,6 @@ export class FinalizeDeploymentV2Service extends BaseService { body: FinalizeDeploymentRequestBody, writer?: WritableStreamDefaultWriter ) { - // If remote builds are not enabled, lets just use the v1 finalize deployment service - if (!remoteBuildsEnabled()) { - const finalizeService = new FinalizeDeploymentService(); - return finalizeService.call(authenticatedEnv, id, body); - } - const deployment = await this._prisma.workerDeployment.findFirst({ where: { friendlyId: id, @@ -62,7 +57,6 @@ export class FinalizeDeploymentV2Service extends BaseService { if (deployment.status === "DEPLOYED") { logger.debug("Worker deployment is already deployed", { id }); - return deployment; } @@ -73,11 +67,16 @@ export class FinalizeDeploymentV2Service extends BaseService { const finalizeService = new FinalizeDeploymentService(); - if (body.skipPushToRegistry) { - logger.debug("Skipping push to registry during deployment finalization", { - deployment, - }); - return await finalizeService.call(authenticatedEnv, id, body); + // If remote builds are not enabled, skip image push and go straight to template + finalize + if (!remoteBuildsEnabled() || body.skipPushToRegistry) { + if (body.skipPushToRegistry) { + logger.debug("Skipping push to registry during deployment finalization", { + deployment, + }); + } + + await this.#createTemplateIfNeeded(deployment, id, authenticatedEnv, writer); + return finalizeService.call(authenticatedEnv, id, body); } const externalBuildData = deployment.externalBuildData @@ -143,9 +142,29 @@ export class FinalizeDeploymentV2Service extends BaseService { pushedImage: pushResult.image, }); - const finalizedDeployment = await finalizeService.call(authenticatedEnv, id, body); + await this.#createTemplateIfNeeded(deployment, id, authenticatedEnv, writer); + return finalizeService.call(authenticatedEnv, id, body); + } - return finalizedDeployment; + async #createTemplateIfNeeded( + deployment: { imageReference: string | null; worker: { project: { id: string } } | null }, + deploymentFriendlyId: string, + authenticatedEnv: AuthenticatedEnvironment, + writer?: WritableStreamDefaultWriter + ): Promise { + if (!deployment.imageReference || !deployment.worker) { + return; + } + + const templateService = new ComputeTemplateCreationService(); + await templateService.handleDeployTemplate({ + projectId: deployment.worker.project.id, + imageReference: deployment.imageReference, + deploymentFriendlyId, + authenticatedEnv, + prisma: this._prisma, + writer, + }); } } diff --git a/apps/webapp/package.json b/apps/webapp/package.json index eaa7ebb42c7..0752ac8fe03 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -57,6 +57,7 @@ "@heroicons/react": "^2.0.12", "@jsonhero/schema-infer": "^0.1.5", "@internal/cache": "workspace:*", + "@internal/compute": "workspace:*", "@internal/llm-model-catalog": "workspace:*", "@internal/redis": "workspace:*", "@internal/run-engine": "workspace:*", diff --git a/internal-packages/compute/package.json b/internal-packages/compute/package.json new file mode 100644 index 00000000000..004d29990f3 --- /dev/null +++ b/internal-packages/compute/package.json @@ -0,0 +1,14 @@ +{ + "name": "@internal/compute", + "private": true, + "version": "0.0.1", + "main": "./src/index.ts", + "types": "./src/index.ts", + "type": "module", + "dependencies": { + "zod": "3.25.76" + }, + "scripts": { + "typecheck": "tsc --noEmit" + } +} diff --git a/internal-packages/compute/src/client.ts b/internal-packages/compute/src/client.ts new file mode 100644 index 00000000000..4f627bd2830 --- /dev/null +++ b/internal-packages/compute/src/client.ts @@ -0,0 +1,151 @@ +import type { + TemplateCreateRequest, + InstanceCreateRequest, + InstanceCreateResponse, + InstanceSnapshotRequest, + SnapshotRestoreRequest, +} from "./types.js"; + +export type ComputeClientOptions = { + gatewayUrl: string; + authToken?: string; + timeoutMs: number; +}; + +export class ComputeClient { + readonly templates: TemplatesNamespace; + readonly instances: InstancesNamespace; + readonly snapshots: SnapshotsNamespace; + + constructor(private opts: ComputeClientOptions) { + const http = new HttpTransport(opts); + this.templates = new TemplatesNamespace(http); + this.instances = new InstancesNamespace(http); + this.snapshots = new SnapshotsNamespace(http); + } +} + +// ── HTTP transport (shared plumbing) ───────────────────────────────────────── + +type RequestOptions = { + signal?: AbortSignal; +}; + +class HttpTransport { + constructor(private opts: ComputeClientOptions) {} + + private get headers(): Record { + const h: Record = { "Content-Type": "application/json" }; + if (this.opts.authToken) { + h["Authorization"] = `Bearer ${this.opts.authToken}`; + } + return h; + } + + private signal(options?: RequestOptions): AbortSignal { + return options?.signal ?? AbortSignal.timeout(this.opts.timeoutMs); + } + + async post(path: string, body: unknown, options?: RequestOptions): Promise { + const url = `${this.opts.gatewayUrl}${path}`; + + const response = await fetch(url, { + method: "POST", + headers: this.headers, + body: JSON.stringify(body), + signal: this.signal(options), + }); + + if (!response.ok) { + const errorBody = await response.text().catch(() => "unknown error"); + throw new ComputeClientError(response.status, errorBody, url); + } + + // 202 Accepted or 204 No Content - no body to parse + if (response.status === 202 || response.status === 204) { + return undefined; + } + + return (await response.json()) as T; + } + + async delete(path: string, options?: RequestOptions): Promise { + const url = `${this.opts.gatewayUrl}${path}`; + + const response = await fetch(url, { + method: "DELETE", + headers: this.headers, + signal: this.signal(options), + }); + + if (!response.ok) { + const errorBody = await response.text().catch(() => "unknown error"); + throw new ComputeClientError(response.status, errorBody, url); + } + } +} + +// ── Error ──────────────────────────────────────────────────────────────────── + +export class ComputeClientError extends Error { + constructor( + public readonly status: number, + public readonly body: string, + public readonly url: string + ) { + super(`Compute gateway request failed (${status}): ${body}`); + this.name = "ComputeClientError"; + } +} + +// ── Namespaces ─────────────────────────────────────────────────────────────── + +class TemplatesNamespace { + constructor(private http: HttpTransport) {} + + async create( + req: TemplateCreateRequest, + options?: RequestOptions + ): Promise { + await this.http.post("/api/templates", req, options); + } +} + +class InstancesNamespace { + constructor(private http: HttpTransport) {} + + async create( + req: InstanceCreateRequest, + options?: RequestOptions + ): Promise { + const result = await this.http.post("/api/instances", req, options); + if (!result) { + throw new Error("Compute gateway returned no instance body"); + } + return result; + } + + async delete(runnerId: string, options?: RequestOptions): Promise { + return this.http.delete(`/api/instances/${runnerId}`, options); + } + + async snapshot( + runnerId: string, + req: InstanceSnapshotRequest, + options?: RequestOptions + ): Promise { + await this.http.post(`/api/instances/${runnerId}/snapshot`, req, options); + } +} + +class SnapshotsNamespace { + constructor(private http: HttpTransport) {} + + async restore( + snapshotId: string, + req: SnapshotRestoreRequest, + options?: RequestOptions + ): Promise { + await this.http.post(`/api/snapshots/${snapshotId}/restore`, req, options); + } +} diff --git a/internal-packages/compute/src/imageRef.ts b/internal-packages/compute/src/imageRef.ts new file mode 100644 index 00000000000..813f2a6a663 --- /dev/null +++ b/internal-packages/compute/src/imageRef.ts @@ -0,0 +1,11 @@ +/** + * Strip the digest suffix from a container image reference. + * Tags are immutable, so we resolve by tag rather than pinning to a digest. + * + * "ghcr.io/org/image:tag@sha256:abc..." -> "ghcr.io/org/image:tag" + * "ghcr.io/org/image@sha256:abc..." -> "ghcr.io/org/image" + * "ghcr.io/org/image:tag" -> "ghcr.io/org/image:tag" (unchanged) + */ +export function stripImageDigest(imageRef: string): string { + return imageRef.split("@")[0] ?? imageRef; +} diff --git a/internal-packages/compute/src/index.ts b/internal-packages/compute/src/index.ts new file mode 100644 index 00000000000..1cec45e6484 --- /dev/null +++ b/internal-packages/compute/src/index.ts @@ -0,0 +1,21 @@ +export { ComputeClient, ComputeClientError } from "./client.js"; +export type { ComputeClientOptions } from "./client.js"; +export { stripImageDigest } from "./imageRef.js"; +export { + TemplateCreateRequestSchema, + TemplateCallbackPayloadSchema, + InstanceCreateRequestSchema, + InstanceCreateResponseSchema, + InstanceSnapshotRequestSchema, + SnapshotRestoreRequestSchema, + SnapshotCallbackPayloadSchema, +} from "./types.js"; +export type { + TemplateCreateRequest, + TemplateCallbackPayload, + InstanceCreateRequest, + InstanceCreateResponse, + InstanceSnapshotRequest, + SnapshotRestoreRequest, + SnapshotCallbackPayload, +} from "./types.js"; diff --git a/internal-packages/compute/src/types.ts b/internal-packages/compute/src/types.ts new file mode 100644 index 00000000000..296e38b59c1 --- /dev/null +++ b/internal-packages/compute/src/types.ts @@ -0,0 +1,73 @@ +import { z } from "zod"; + +// ── Templates ──────────────────────────────────────────────────────────────── + +export const TemplateCreateRequestSchema = z.object({ + image: z.string(), + cpu: z.number(), + memory_gb: z.number(), + background: z.boolean().optional(), + callback: z + .object({ + url: z.string(), + metadata: z.record(z.string()).optional(), + }) + .optional(), +}); +export type TemplateCreateRequest = z.infer; + +export const TemplateCallbackPayloadSchema = z.object({ + template_id: z.string().optional(), + image: z.string(), + status: z.enum(["completed", "failed"]), + error: z.string().optional(), + metadata: z.record(z.string()).optional(), + duration_ms: z.number().optional(), +}); +export type TemplateCallbackPayload = z.infer; + +// ── Instances ──────────────────────────────────────────────────────────────── + +export const InstanceCreateRequestSchema = z.object({ + name: z.string(), + image: z.string(), + env: z.record(z.string()), + cpu: z.number(), + memory_gb: z.number(), + metadata: z.record(z.unknown()).optional(), +}); +export type InstanceCreateRequest = z.infer; + +export const InstanceCreateResponseSchema = z.object({ + id: z.string(), + _timing: z.unknown().optional(), +}); +export type InstanceCreateResponse = z.infer; + +export const InstanceSnapshotRequestSchema = z.object({ + callback: z.object({ + url: z.string(), + metadata: z.record(z.string()), + }), +}); +export type InstanceSnapshotRequest = z.infer; + +// ── Snapshots ──────────────────────────────────────────────────────────────── + +export const SnapshotRestoreRequestSchema = z.object({ + name: z.string(), + metadata: z.record(z.string()), + cpu: z.number(), + memory_gb: z.number(), +}); +export type SnapshotRestoreRequest = z.infer; + +export const SnapshotCallbackPayloadSchema = z.object({ + snapshot_id: z.string(), + instance_id: z.string(), + status: z.enum(["completed", "failed"]), + error: z.string().optional(), + metadata: z.record(z.string()).optional(), + duration_ms: z.number().optional(), +}); +export type SnapshotCallbackPayload = z.infer; diff --git a/internal-packages/compute/tsconfig.json b/internal-packages/compute/tsconfig.json new file mode 100644 index 00000000000..ec9998c5e00 --- /dev/null +++ b/internal-packages/compute/tsconfig.json @@ -0,0 +1,18 @@ +{ + "compilerOptions": { + "target": "ES2019", + "lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "noEmit": true, + "strict": true + }, + "exclude": ["node_modules"] +} diff --git a/internal-packages/database/prisma/migrations/20260326150000_add_workload_type_to_worker_instance_group/migration.sql b/internal-packages/database/prisma/migrations/20260326150000_add_workload_type_to_worker_instance_group/migration.sql new file mode 100644 index 00000000000..4865ae070e3 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260326150000_add_workload_type_to_worker_instance_group/migration.sql @@ -0,0 +1,5 @@ +-- CreateEnum +CREATE TYPE "WorkloadType" AS ENUM ('CONTAINER', 'MICROVM'); + +-- AlterTable +ALTER TABLE "WorkerInstanceGroup" ADD COLUMN "workloadType" "WorkloadType" NOT NULL DEFAULT 'CONTAINER'; diff --git a/internal-packages/database/prisma/migrations/20260328000000_add_compute_checkpoint_type/migration.sql b/internal-packages/database/prisma/migrations/20260328000000_add_compute_checkpoint_type/migration.sql new file mode 100644 index 00000000000..45faeca79b1 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260328000000_add_compute_checkpoint_type/migration.sql @@ -0,0 +1,2 @@ +-- AlterEnum +ALTER TYPE "TaskRunCheckpointType" ADD VALUE 'COMPUTE'; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index b60dcd7c9b0..9b2388f87b1 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1100,6 +1100,7 @@ model TaskRunCheckpoint { enum TaskRunCheckpointType { DOCKER KUBERNETES + COMPUTE } /// A Waitpoint blocks a run from continuing until it's completed @@ -1289,6 +1290,11 @@ enum WorkerInstanceGroupType { UNMANAGED } +enum WorkloadType { + CONTAINER + MICROVM +} + model WorkerInstanceGroup { id String @id @default(cuid()) type WorkerInstanceGroupType @@ -1323,6 +1329,8 @@ model WorkerInstanceGroup { location String? staticIPs String? + workloadType WorkloadType @default(CONTAINER) + createdAt DateTime @default(now()) updatedAt DateTime @updatedAt } diff --git a/packages/cli-v3/src/deploy/buildImage.ts b/packages/cli-v3/src/deploy/buildImage.ts index 2225d7db056..31a2b658545 100644 --- a/packages/cli-v3/src/deploy/buildImage.ts +++ b/packages/cli-v3/src/deploy/buildImage.ts @@ -205,6 +205,7 @@ async function remoteBuildImage(options: DepotBuildImageOptions): Promise { await testConsumers[0].onDequeue(messages); } - expect(mockOnDequeue).toHaveBeenCalledWith(messages); + expect(mockOnDequeue).toHaveBeenCalledWith(messages, undefined); advanceTimeAndProcessMetrics(1100); const metrics = pool.getMetrics(); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts index 2dd3d1b898b..d72cef75c7c 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts @@ -351,12 +351,12 @@ export class RunQueueConsumerPool { const consumer = this.consumerFactory({ ...this.consumerOptions, - onDequeue: async (messages) => { + onDequeue: async (messages, timing) => { // Always update queue length, default to 0 for empty dequeues or missing value this.updateQueueLength(messages[0]?.workerQueueLength ?? 0); // Forward to the original handler - await this.consumerOptions.onDequeue(messages); + await this.consumerOptions.onDequeue(messages, timing); }, }); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/events.ts b/packages/core/src/v3/runEngineWorker/supervisor/events.ts index a51c504a3e6..df4a93686a9 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/events.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/events.ts @@ -6,6 +6,8 @@ export type WorkerEvents = { { time: Date; message: DequeuedMessage; + dequeueResponseMs?: number; + pollingIntervalMs?: number; }, ]; requestRunAttemptStart: [ diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 4379eb54f37..76faee40809 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -15,7 +15,7 @@ export type RunQueueConsumerOptions = { preDequeue?: PreDequeueFn; preSkip?: PreSkipFn; maxRunCount?: number; - onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; }; export class RunQueueConsumer implements QueueConsumer { @@ -23,13 +23,14 @@ export class RunQueueConsumer implements QueueConsumer { private readonly preDequeue?: PreDequeueFn; private readonly preSkip?: PreSkipFn; private readonly maxRunCount?: number; - private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise; private readonly logger = new SimpleStructuredLogger("queue-consumer"); private intervalMs: number; private idleIntervalMs: number; private isEnabled: boolean; + private lastScheduledIntervalMs: number; constructor(opts: RunQueueConsumerOptions) { this.isEnabled = false; @@ -38,6 +39,7 @@ export class RunQueueConsumer implements QueueConsumer { this.preDequeue = opts.preDequeue; this.preSkip = opts.preSkip; this.maxRunCount = opts.maxRunCount; + this.lastScheduledIntervalMs = opts.idleIntervalMs; this.onDequeue = opts.onDequeue; this.client = opts.client; } @@ -111,16 +113,18 @@ export class RunQueueConsumer implements QueueConsumer { let nextIntervalMs = this.idleIntervalMs; try { + const dequeueStart = performance.now(); const response = await this.client.dequeue({ maxResources: preDequeueResult?.maxResources, maxRunCount: this.maxRunCount, }); + const dequeueResponseMs = Math.round(performance.now() - dequeueStart); if (!response.success) { this.logger.error("Failed to dequeue", { error: response.error }); } else { try { - await this.onDequeue(response.data); + await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs }); if (response.data.length > 0) { nextIntervalMs = this.intervalMs; @@ -141,6 +145,7 @@ export class RunQueueConsumer implements QueueConsumer { this.logger.verbose("scheduled dequeue with idle interval", { delayMs }); } + this.lastScheduledIntervalMs = delayMs; setTimeout(this.dequeue.bind(this), delayMs); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index e5a783b8d41..b2d344fb3dc 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -80,13 +80,15 @@ export class SupervisorSession extends EventEmitter { }); } - private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise { + private async onDequeue(messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }): Promise { this.logger.verbose("Dequeued messages with contents", { count: messages.length, messages }); for (const message of messages) { this.emit("runQueueMessage", { time: new Date(), message, + dequeueResponseMs: timing?.dequeueResponseMs, + pollingIntervalMs: timing?.pollingIntervalMs, }); } } diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index 33b281d2146..9378b290270 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -177,7 +177,8 @@ export type CompleteRunAttemptResult = z.infer; export const CheckpointTypeEnum = { DOCKER: "DOCKER", KUBERNETES: "KUBERNETES", -} satisfies Enum; + COMPUTE: "COMPUTE", +} satisfies Enum; export type CheckpointTypeEnum = (typeof CheckpointTypeEnum)[keyof typeof CheckpointTypeEnum]; export const CheckpointType = z.enum(Object.values(CheckpointTypeEnum) as [CheckpointTypeEnum]); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index df194c6295a..be0fe1a6f6b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -190,6 +190,9 @@ importers: '@aws-sdk/client-ecr': specifier: ^3.839.0 version: 3.839.0 + '@internal/compute': + specifier: workspace:* + version: link:../../internal-packages/compute '@kubernetes/client-node': specifier: ^1.0.0 version: 1.0.0(patch_hash=ba1a06f46256cdb8d6faf7167246692c0de2e7cd846a9dc0f13be0137e1c3745)(bufferutil@4.0.9)(encoding@0.1.13) @@ -199,6 +202,9 @@ importers: dockerode: specifier: ^4.0.6 version: 4.0.6 + p-limit: + specifier: ^6.2.0 + version: 6.2.0 prom-client: specifier: ^15.1.0 version: 15.1.0 @@ -305,6 +311,9 @@ importers: '@internal/cache': specifier: workspace:* version: link:../../internal-packages/cache + '@internal/compute': + specifier: workspace:* + version: link:../../internal-packages/compute '@internal/llm-model-catalog': specifier: workspace:* version: link:../../internal-packages/llm-model-catalog @@ -1078,6 +1087,12 @@ importers: specifier: 6.0.1 version: 6.0.1 + internal-packages/compute: + dependencies: + zod: + specifier: 3.25.76 + version: 3.25.76 + internal-packages/database: dependencies: '@prisma/client': @@ -11208,7 +11223,7 @@ packages: '@vercel/postgres@0.10.0': resolution: {integrity: sha512-fSD23DxGND40IzSkXjcFcxr53t3Tiym59Is0jSYIFpG4/0f0KO9SGtcp1sXiebvPaGe7N/tU05cH4yt2S6/IPg==} engines: {node: '>=18.14'} - deprecated: '@vercel/postgres is deprecated. You can either choose an alternate storage solution from the Vercel Marketplace if you want to set up a new database. Or you can follow this guide to migrate your existing Vercel Postgres db: https://neon.com/docs/guides/vercel-postgres-transition-guide' + deprecated: '@vercel/postgres is deprecated. If you are setting up a new database, you can choose an alternate storage solution from the Vercel Marketplace. If you had an existing Vercel Postgres database, it should have been migrated to Neon as a native Vercel integration. You can find more details and the guide to migrate to Neon''s SDKs here: https://neon.com/docs/guides/vercel-postgres-transition-guide' '@vercel/sdk@1.19.1': resolution: {integrity: sha512-K4rmtUT6t1vX06tiY44ot8A7W1FKN7g/tMkE7yZghCgNQ8b30SzljBd4ni8RNp2pJzM/HrZmphRDeIArO7oZuw==} @@ -11875,6 +11890,7 @@ packages: basic-ftp@5.0.3: resolution: {integrity: sha512-QHX8HLlncOLpy54mh+k/sWIFd0ThmRqwe9ZjELybGZK+tZ8rUb9VO0saKJUROTbE+KhzDUT7xziGpGrW8Kmd+g==} engines: {node: '>=10.0.0'} + deprecated: Security vulnerability fixed in 5.2.0, please upgrade bcrypt-pbkdf@1.0.2: resolution: {integrity: sha512-qeFIXtP4MSoi6NLqO12WfqARWWuCKi2Rn/9hJLEmtB5yTNr9DqFWkJRCf2qShWzPeAMRnOgCrq0sg/KLv5ES9w==} @@ -14314,25 +14330,29 @@ packages: glob@10.3.10: resolution: {integrity: sha512-fa46+tv1Ak0UPK1TOy/pZrIybNNt4HCv7SDzwyfiOZkvZLEbjsZkJBPtDHVshZjbecAoAGSC20MjLDG/qr679g==} engines: {node: '>=16 || 14 >=14.17'} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@10.3.4: resolution: {integrity: sha512-6LFElP3A+i/Q8XQKEvZjkEWEOTgAIALR9AO2rwT8bgPhDd1anmqDJDZ6lLddI4ehxxxR1S5RIqKe1uapMQfYaQ==} engines: {node: '>=16 || 14 >=14.17'} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@10.4.5: resolution: {integrity: sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg==} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@11.0.0: resolution: {integrity: sha512-9UiX/Bl6J2yaBbxKoEBRm4Cipxgok8kQYcOPEhScPwebu2I0HoQOuYdIO6S3hLuWoZgpDpwQZMzTFxgpkyT76g==} engines: {node: 20 || >=22} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true glob@7.2.3: resolution: {integrity: sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==} - deprecated: Glob versions prior to v9 are no longer supported + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me glob@9.3.5: resolution: {integrity: sha512-e1LleDykUz2Iu+MTYdkSsuWX8lvAjAcs0Xef0lNIu0S2wOAzuTxCJtcd9S3cijlwYF18EsU3rzb8jPVobxDh9Q==} @@ -17350,6 +17370,7 @@ packages: prebuild-install@7.1.3: resolution: {integrity: sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug==} engines: {node: '>=10'} + deprecated: No longer maintained. Please contact the author of the relevant native addon; alternatives are available. hasBin: true preferred-pm@3.0.3: @@ -19047,21 +19068,22 @@ packages: tar@6.1.13: resolution: {integrity: sha512-jdIBIN6LTIe2jqzay/2vtYLlBHa3JF42ot3h1dW8Q0PaAG4v8rm0cvpVePtau5C6OKXGGcgO9q2AMNSWxiLqKw==} engines: {node: '>=10'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tar@6.2.1: resolution: {integrity: sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==} engines: {node: '>=10'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tar@7.4.3: resolution: {integrity: sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw==} engines: {node: '>=18'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tar@7.5.6: resolution: {integrity: sha512-xqUeu2JAIJpXyvskvU3uvQW8PAmHrtXp2KDuMJwQqW8Sqq0CaZBAQ+dKS3RBXVhU4wC5NjAdKrmh84241gO9cA==} engines: {node: '>=18'} + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tdigest@0.1.2: resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==} @@ -23237,7 +23259,7 @@ snapshots: '@epic-web/test-server@0.1.0(bufferutil@4.0.9)': dependencies: '@hono/node-server': 1.12.2(hono@4.5.11) - '@hono/node-ws': 1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9) + '@hono/node-ws': 1.0.4(@hono/node-server@1.12.2(hono@4.11.8))(bufferutil@4.0.9) '@open-draft/deferred-promise': 2.2.0 '@types/ws': 8.5.12 hono: 4.5.11 @@ -23992,7 +24014,7 @@ snapshots: dependencies: hono: 4.11.8 - '@hono/node-ws@1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9)': + '@hono/node-ws@1.0.4(@hono/node-server@1.12.2(hono@4.11.8))(bufferutil@4.0.9)': dependencies: '@hono/node-server': 1.12.2(hono@4.5.11) ws: 8.18.3(bufferutil@4.0.9)