Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,26 @@ POSTHOG_PROJECT_KEY=
# DEPOT_TOKEN=<Depot org token>
# DEV_OTEL_EXPORTER_OTLP_ENDPOINT="http://0.0.0.0:4318"
# These are needed for the object store (for handling large payloads/outputs)
# Default provider (backward compatible - no protocol prefix)
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
# OBJECT_STORE_ACCESS_KEY_ID=
# OBJECT_STORE_SECRET_ACCESS_KEY=
# OBJECT_STORE_REGION=auto
# OBJECT_STORE_SERVICE=s3
# OBJECT_STORE_DEFAULT_PROTOCOL=s3 # Optional: protocol to use for new uploads (e.g., "s3", "r2")
#
# Named providers (protocol-prefixed data) - optional for multi-provider support
# OBJECT_STORE_S3_BASE_URL=https://s3.amazonaws.com
# OBJECT_STORE_S3_ACCESS_KEY_ID=
# OBJECT_STORE_S3_SECRET_ACCESS_KEY=
# OBJECT_STORE_S3_REGION=us-east-1
# OBJECT_STORE_S3_SERVICE=s3
#
# OBJECT_STORE_R2_BASE_URL=https://{bucket}.{accountId}.r2.cloudflarestorage.com
# OBJECT_STORE_R2_ACCESS_KEY_ID=
# OBJECT_STORE_R2_SECRET_ACCESS_KEY=
# OBJECT_STORE_R2_REGION=auto
# OBJECT_STORE_R2_SERVICE=s3
# CHECKPOINT_THRESHOLD_IN_MS=10000

# These control the server-side internal telemetry
Expand Down
6 changes: 6 additions & 0 deletions .server-changes/multi-provider-object-storage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Multi-provider object storage with protocol-based routing for zero-downtime migration
6 changes: 6 additions & 0 deletions .server-changes/object-store-iam-auth.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add IAM role-based auth support for object stores (no access keys required).
7 changes: 7 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,18 @@ const EnvironmentSchema = z
.default(60 * 1000 * 15), // 15 minutes

OBJECT_STORE_BASE_URL: z.string().optional(),
OBJECT_STORE_BUCKET: z.string().optional(),
OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
OBJECT_STORE_SECRET_ACCESS_KEY: z.string().optional(),
OBJECT_STORE_REGION: z.string().optional(),
OBJECT_STORE_SERVICE: z.string().default("s3"),

// Protocol to use for new uploads (e.g., "s3", "r2"). Data without protocol uses default provider above.
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),

ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
ARTIFACTS_OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.packets.$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";

const ParamsSchema = z.object({
"*": z.string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { basename } from "node:path";
import { z } from "zod";
import { prisma } from "~/db.server";
import { requireUserId } from "~/services/session.server";
import { generatePresignedRequest } from "~/v3/r2.server";
import { generatePresignedRequest } from "~/v3/objectStore.server";

const ParamSchema = z.object({
environmentId: z.string(),
Expand Down
28 changes: 17 additions & 11 deletions apps/webapp/app/runEngine/concerns/batchPayloads.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore, r2 } from "~/v3/r2.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { hasObjectStoreClient, uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { startActiveSpan } from "~/v3/tracer.server";

export type BatchPayloadProcessResult = {
/** The processed payload - either the original or an R2 path */
Expand Down Expand Up @@ -31,7 +31,7 @@ export class BatchPayloadProcessor {
* If not available, large payloads will be stored inline (which may fail for very large payloads).
*/
isObjectStoreAvailable(): boolean {
return r2 !== undefined && env.OBJECT_STORE_BASE_URL !== undefined;
return hasObjectStoreClient();
}

/**
Expand Down Expand Up @@ -103,11 +103,17 @@ export class BatchPayloadProcessor {
};
}

// Upload to R2
// Upload to object store
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(
filename,
packet.data,
packet.dataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
)
);

if (uploadError) {
Expand All @@ -125,18 +131,18 @@ export class BatchPayloadProcessor {
);
}

logger.debug("Batch item payload offloaded to R2", {
logger.debug("Batch item payload offloaded to object store", {
batchId,
itemIndex,
filename,
filename: uploadedFilename,
size,
});

span.setAttribute("wasOffloaded", true);
span.setAttribute("offloadPath", filename);
span.setAttribute("offloadPath", uploadedFilename);

return {
payload: filename,
payload: uploadedFilename!,
payloadType: "application/store",
wasOffloaded: true,
size,
Expand Down
8 changes: 4 additions & 4 deletions apps/webapp/app/runEngine/concerns/payloads.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/
import { PayloadProcessor, TriggerTaskRequest } from "../types";
import { env } from "~/env.server";
import { startActiveSpan } from "~/v3/tracer.server";
import { uploadPacketToObjectStore } from "~/v3/r2.server";
import { uploadPacketToObjectStore } from "~/v3/objectStore.server";
import { ServiceValidationError } from "~/v3/services/common.server";

export class DefaultPayloadProcessor implements PayloadProcessor {
Expand Down Expand Up @@ -31,16 +31,16 @@ export class DefaultPayloadProcessor implements PayloadProcessor {

const filename = `${request.friendlyId}/payload.json`;

const [uploadError] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment)
const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(filename, packet.data, packet.dataType, request.environment, env.OBJECT_STORE_DEFAULT_PROTOCOL)
);

if (uploadError) {
throw new ServiceValidationError("Failed to upload large payload to object store", 500); // This is retryable
}

return {
data: filename,
data: uploadedFilename!,
dataType: "application/store",
};
});
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/runEngine/services/batchTrigger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { env } from "~/env.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { batchTriggerWorker } from "~/v3/batchTriggerWorker.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server";
import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/objectStore.server";
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
import { TriggerTaskService } from "../../v3/services/triggerTask.server";
import { startActiveSpan } from "../../v3/tracer.server";
Expand Down Expand Up @@ -716,10 +716,10 @@ export class RunEngineBatchTriggerService extends WithRunEngine {

const filename = `${pathPrefix}/payload.json`;

await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment);
const uploadedFilename = await uploadPacketToObjectStore(filename, packet.data, packet.dataType, environment);

return {
data: filename,
data: uploadedFilename,
dataType: "application/store",
};
});
Expand Down
Loading
Loading