Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
4bb1168
feat: add @trigger.dev/ai package with TriggerChatTransport
cursoragent Feb 15, 2026
3caf47b
test: add comprehensive unit tests for TriggerChatTransport
cursoragent Feb 15, 2026
8ba87a7
refactor: polish TriggerChatTransport implementation
cursoragent Feb 15, 2026
393bd79
test: add abort signal, multiple sessions, and body merging tests
cursoragent Feb 15, 2026
3e0711c
chore: add changeset for @trigger.dev/ai package
cursoragent Feb 15, 2026
f8f2f74
refactor: remove internal ChatSessionState from public exports
cursoragent Feb 15, 2026
4db5f07
feat: support dynamic accessToken function for token refresh
cursoragent Feb 15, 2026
b147407
refactor: avoid double-resolving accessToken in sendMessages
cursoragent Feb 15, 2026
b7a921b
feat: add chat transport and AI chat helpers to @trigger.dev/sdk
cursoragent Feb 15, 2026
77f0dc7
test: move chat transport tests to @trigger.dev/sdk
cursoragent Feb 15, 2026
edbd6d0
refactor: delete packages/ai/ — moved to @trigger.dev/sdk subpaths
cursoragent Feb 15, 2026
a888d67
chore: update changeset to target @trigger.dev/sdk
cursoragent Feb 15, 2026
9954b92
fix: address CodeRabbit review feedback
cursoragent Feb 15, 2026
61c766e
docs(ai): add AI Chat with useChat guide
cursoragent Feb 15, 2026
3e1ab7f
feat(reference): add ai-chat Next.js reference project
cursoragent Feb 15, 2026
296c525
fix(reference): use compatible @ai-sdk v3 packages, await convertToMo…
cursoragent Feb 15, 2026
61c9ccd
Use a single run with iterative waitpoint token completions
ericallam Feb 21, 2026
b3ef2d5
Added tool example
ericallam Feb 21, 2026
c87a321
expose a useTriggerChatTransport hook
ericallam Feb 21, 2026
c5032af
use input streams and rename chatTask and chatState to chat.task and …
ericallam Mar 3, 2026
ce73f9b
add stopping support and fix issue with the OpenAI responses API and …
ericallam Mar 4, 2026
206ea0f
Add warmTimeoutInSeconds option
ericallam Mar 4, 2026
7c8652c
Add clientData support
ericallam Mar 4, 2026
5ba385c
provide already converted UIMessages to the run function for better dx
ericallam Mar 4, 2026
7729b25
Added better telemetry support to view turns
ericallam Mar 4, 2026
23da303
Fix double looping when resuming from an input stream waitpoint
ericallam Mar 4, 2026
ad61bd8
Add some pending message support in the example
ericallam Mar 4, 2026
71e0d85
Accumulate messages in the task, allowing us to only have to send use…
ericallam Mar 5, 2026
72847a8
build full example with persisting messages, adding necessary hooks, …
ericallam Mar 5, 2026
963584f
Add ai chat to the sidebar for now
ericallam Mar 5, 2026
5988c24
remove postinstall hook
ericallam Mar 5, 2026
d32a66e
feat: add onTurnStart hook, lastEventId support, and stream resume de…
ericallam Mar 5, 2026
d6453fa
Minor fixes around reconnecting streams
ericallam Mar 6, 2026
8b36338
update pnpm link file
ericallam Mar 6, 2026
0ef2e2a
fixed chat tests
ericallam Mar 6, 2026
bd36b97
use locals for the chat pipe counter instead of a module global
ericallam Mar 6, 2026
52ed447
Add triggerOptions to the transport, auto-tag with the chat ID
ericallam Mar 6, 2026
d7e14e9
Make clientData typesafe and pass to all chat.task hooks
ericallam Mar 6, 2026
540d9bb
feat: add chat.local for per-run typed data with Proxy access and dir…
ericallam Mar 6, 2026
3b21f03
feat(chat): add stop handling, abort cleanup, continuation support, a…
ericallam Mar 7, 2026
4149e4e
Some improvements to the example ai-chat
ericallam Mar 7, 2026
1bdd70d
feat(chat): expose typed chat.stream, add deepResearch subtask exampl…
ericallam Mar 8, 2026
afd4f93
feat(ai): pass chat context and toolCallId to subtasks, add typed ai.…
ericallam Mar 8, 2026
6a0b063
feat(chat): add preload support, dynamic tools, and preload-specific …
ericallam Mar 9, 2026
af36799
docs: add mermaid architecture diagrams for ai-chat system
ericallam Mar 9, 2026
6a8e14c
docs: add sequence diagrams to ai-chat guide
ericallam Mar 9, 2026
7b9168e
feat(chat): auto-hydrate chat.local values in ai.tool subtasks
ericallam Mar 9, 2026
27ea804
feat(chat): add chat.defer(), preload toggle, TTFB measurement, and f…
ericallam Mar 9, 2026
903856e
fix(reference): replace hand-rolled HTML stripping with turndown
ericallam Mar 9, 2026
7448066
feat(streams): add inputStream.waitWithWarmup(), warm timeout config …
ericallam Mar 9, 2026
eba8c16
feat(chat): add composable primitives, raw task example, and task mod…
ericallam Mar 10, 2026
2f325ee
Introduce the chat session API and better docs organization
ericallam Mar 10, 2026
2a04f45
Add support for toUIMessageStream() options
ericallam Mar 10, 2026
827ad89
Add metadata to the streamText call
ericallam Mar 12, 2026
924c9ac
feat(chat): add chat.prompt API with provider registry support
ericallam Mar 23, 2026
2ebebed
refactor: rename warmTimeout to idleTimeout across chat APIs
ericallam Mar 23, 2026
149de37
feat: support message compaction
ericallam Mar 24, 2026
45cfd40
better compaction support in our other chat variants
ericallam Mar 24, 2026
61878ec
feat(chat): add compaction option, pendingMessages steering, and useP…
ericallam Mar 25, 2026
a33f294
Add a writer to easily write chunks in callbacks
ericallam Mar 26, 2026
518dc5d
feat(ai): add triggerAndSubscribe method and use it in ai.tool
ericallam Mar 26, 2026
516c21e
feat(chat): add chat.inject() for background context injection and ch…
ericallam Mar 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .changeset/ai-sdk-chat-transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
"@trigger.dev/sdk": minor
---

Add AI SDK chat transport integration via two new subpath exports:

**`@trigger.dev/sdk/chat`** (frontend, browser-safe):
- `TriggerChatTransport` — custom `ChatTransport` for the AI SDK's `useChat` hook that runs chat completions as durable Trigger.dev tasks
- `createChatTransport()` — factory function

```tsx
import { useChat } from "@ai-sdk/react";
import { TriggerChatTransport } from "@trigger.dev/sdk/chat";

const { messages, sendMessage } = useChat({
transport: new TriggerChatTransport({
task: "my-chat-task",
accessToken,
}),
});
```

**`@trigger.dev/sdk/ai`** (backend, extends existing `ai.tool`/`ai.currentToolOptions`):
- `chatTask()` — pre-typed task wrapper with auto-pipe support
- `pipeChat()` — pipe a `StreamTextResult` or stream to the frontend
- `CHAT_STREAM_KEY` — the default stream key constant
- `ChatTaskPayload` type

```ts
import { chatTask } from "@trigger.dev/sdk/ai";
import { streamText, convertToModelMessages } from "ai";

export const myChatTask = chatTask({
id: "my-chat-task",
run: async ({ messages }) => {
return streamText({
model: openai("gpt-4o"),
messages: convertToModelMessages(messages),
});
},
});
```
22 changes: 22 additions & 0 deletions .claude/rules/package-installation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
---
paths:
- "**/package.json"
---

# Installing Packages

When adding a new dependency to any package.json in the monorepo:

1. **Look up the latest version** on npm before adding:
```bash
pnpm view <package-name> version
```
If unsure which version to use (e.g. major version compatibility), confirm with the user.

2. **Edit the package.json directly** — do NOT use `pnpm add` as it can cause issues in the monorepo. Add the dependency with the correct version range (typically `^x.y.z`).

3. **Run `pnpm i` from the repo root** after editing to install and update the lockfile:
```bash
pnpm i
```
Always run from the repo root, not from the package directory.
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This file provides guidance to Claude Code when working with this repository. Su

This is a pnpm 10.23.0 monorepo using Turborepo. Run commands from root with `pnpm run`.

**Adding dependencies:** Edit `package.json` directly instead of using `pnpm add`, then run `pnpm i` from the repo root. See `.claude/rules/package-installation.md` for the full process.

```bash
pnpm run docker # Start Docker services (PostgreSQL, Redis, Electric)
pnpm run db:migrate # Run database migrations
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export {
getSchemaParseFn,
type AnySchemaParseFn,
type SchemaParseFn,
type inferSchemaOut,
isSchemaZodEsque,
isSchemaValibotEsque,
isSchemaArkTypeEsque,
Expand Down
12 changes: 12 additions & 0 deletions packages/core/src/v3/inputStreams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ export class InputStreamsAPI implements InputStreamManager {
return this.#getManager().lastSeqNum(streamId);
}

public setLastSeqNum(streamId: string, seqNum: number): void {
this.#getManager().setLastSeqNum(streamId, seqNum);
}

public shiftBuffer(streamId: string): boolean {
return this.#getManager().shiftBuffer(streamId);
}

public disconnectStream(streamId: string): void {
this.#getManager().disconnectStream(streamId);
}

public clearHandlers(): void {
this.#getManager().clearHandlers();
}
Expand Down
29 changes: 29 additions & 0 deletions packages/core/src/v3/inputStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ export class StandardInputStreamManager implements InputStreamManager {
return this.seqNums.get(streamId);
}

setLastSeqNum(streamId: string, seqNum: number): void {
const current = this.seqNums.get(streamId);
// Only advance forward, never backward
if (current === undefined || seqNum > current) {
this.seqNums.set(streamId, seqNum);
}
}

shiftBuffer(streamId: string): boolean {
const buffered = this.buffer.get(streamId);
if (buffered && buffered.length > 0) {
buffered.shift();
if (buffered.length === 0) {
this.buffer.delete(streamId);
}
return true;
}
return false;
}

setRunId(runId: string, streamsVersion?: string): void {
this.currentRunId = runId;
this.streamsVersion = streamsVersion;
Expand Down Expand Up @@ -158,6 +178,15 @@ export class StandardInputStreamManager implements InputStreamManager {
}
}

disconnectStream(streamId: string): void {
const tail = this.tails.get(streamId);
if (tail) {
tail.abortController.abort();
this.tails.delete(streamId);
}
this.buffer.delete(streamId);
}

connectTail(runId: string, _fromSeq?: number): void {
// No-op: tails are now created per-stream lazily
}
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/v3/inputStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ export class NoopInputStreamManager implements InputStreamManager {
return undefined;
}

setLastSeqNum(_streamId: string, _seqNum: number): void {}

shiftBuffer(_streamId: string): boolean { return false; }

disconnectStream(_streamId: string): void {}

clearHandlers(): void {}
reset(): void {}
disconnect(): void {}
Expand Down
22 changes: 22 additions & 0 deletions packages/core/src/v3/inputStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ export interface InputStreamManager {
*/
lastSeqNum(streamId: string): number | undefined;

/**
* Advance the last-seen S2 sequence number for the given input stream.
* Used after `.wait()` resumes to prevent the SSE tail from replaying
* the record that was consumed via the waitpoint path.
*/
setLastSeqNum(streamId: string, seqNum: number): void;

/**
* Remove and discard the first buffered item for the given input stream.
* Used after `.wait()` resumes to remove the duplicate that the SSE tail
* buffered while the waitpoint was being completed via a separate path.
* Returns true if an item was removed, false if the buffer was empty.
*/
shiftBuffer(streamId: string): boolean;

/**
* Disconnect the SSE tail and clear the buffer for a specific input stream.
* Used before suspending via `.wait()` so the tail doesn't buffer duplicates
* of data that will be delivered through the waitpoint path.
*/
disconnectStream(streamId: string): void;

/**
* Clear all persistent `.on()` handlers and abort tails that have no remaining once waiters.
* Called automatically when a task run completes.
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/v3/realtimeStreams/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
RealtimeStreamInstance,
RealtimeStreamOperationOptions,
RealtimeStreamsManager,
StreamWriteResult,
} from "./types.js";

export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
Expand All @@ -16,7 +17,7 @@ export class StandardRealtimeStreamsManager implements RealtimeStreamsManager {
) {}
// Track active streams - using a Set allows multiple streams for the same key to coexist
private activeStreams = new Set<{
wait: () => Promise<void>;
wait: () => Promise<StreamWriteResult>;
abortController: AbortController;
}>();

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/realtimeStreams/noopManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class NoopRealtimeStreamsManager implements RealtimeStreamsManager {
options?: RealtimeStreamOperationOptions
): RealtimeStreamInstance<T> {
return {
wait: () => Promise.resolve(),
wait: () => Promise.resolve({}),
get stream(): AsyncIterableStream<T> {
return createAsyncIterableStreamFromAsyncIterable(source);
},
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/v3/realtimeStreams/streamInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
import { AnyZodFetchOptions } from "../zodfetch.js";
import { StreamsWriterV1 } from "./streamsWriterV1.js";
import { StreamsWriterV2 } from "./streamsWriterV2.js";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type StreamInstanceOptions<T> = {
apiClient: ApiClient;
Expand Down Expand Up @@ -63,8 +63,9 @@ export class StreamInstance<T> implements StreamsWriter {
return streamWriter;
}

public async wait(): Promise<void> {
return this.streamPromise.then((writer) => writer.wait());
public async wait(): Promise<StreamWriteResult> {
const writer = await this.streamPromise;
return writer.wait();
}

public get stream(): AsyncIterableStream<T> {
Expand Down
7 changes: 4 additions & 3 deletions packages/core/src/v3/realtimeStreams/streamsWriterV1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { request as httpsRequest } from "node:https";
import { request as httpRequest } from "node:http";
import { URL } from "node:url";
import { randomBytes } from "node:crypto";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";

export type StreamsWriterV1Options<T> = {
baseUrl: string;
Expand Down Expand Up @@ -258,8 +258,9 @@ export class StreamsWriterV1<T> implements StreamsWriter {
await this.makeRequest(0);
}

public async wait(): Promise<void> {
return this.streamPromise;
public async wait(): Promise<StreamWriteResult> {
await this.streamPromise;
return {};
}

public [Symbol.asyncIterator]() {
Expand Down
10 changes: 6 additions & 4 deletions packages/core/src/v3/realtimeStreams/streamsWriterV2.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { S2, AppendRecord, BatchTransform } from "@s2-dev/streamstore";
import { StreamsWriter } from "./types.js";
import { StreamsWriter, StreamWriteResult } from "./types.js";
import { nanoid } from "nanoid";

export type StreamsWriterV2Options<T = any> = {
Expand Down Expand Up @@ -54,6 +54,7 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
private readonly maxInflightBytes: number;
private aborted = false;
private sessionWritable: WritableStream<any> | null = null;
private lastSeqNum: number | undefined;

constructor(private options: StreamsWriterV2Options<T>) {
this.debug = options.debug ?? false;
Expand Down Expand Up @@ -169,9 +170,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
const lastAcked = session.lastAckedPosition();

if (lastAcked?.end) {
const recordsWritten = lastAcked.end.seqNum;
this.lastSeqNum = lastAcked.end.seqNum;
this.log(
`[S2MetadataStream] Written ${recordsWritten} records, ending at seqNum=${lastAcked.end.seqNum}`
`[S2MetadataStream] Written ${this.lastSeqNum} records, ending at seqNum=${this.lastSeqNum}`
);
}
} catch (error) {
Expand All @@ -184,8 +185,9 @@ export class StreamsWriterV2<T = any> implements StreamsWriter {
}
}

public async wait(): Promise<void> {
public async wait(): Promise<StreamWriteResult> {
await this.streamPromise;
return { lastEventId: this.lastSeqNum?.toString() };
}

public [Symbol.asyncIterator]() {
Expand Down
36 changes: 33 additions & 3 deletions packages/core/src/v3/realtimeStreams/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ export interface RealtimeStreamsManager {
): Promise<void>;
}

export type StreamWriteResult = {
lastEventId?: string;
};

export interface RealtimeStreamInstance<T> {
wait(): Promise<void>;
wait(): Promise<StreamWriteResult>;
get stream(): AsyncIterableStream<T>;
}

export interface StreamsWriter {
wait(): Promise<void>;
wait(): Promise<StreamWriteResult>;
}

export type RealtimeDefinedStream<TPart> = {
Expand Down Expand Up @@ -71,6 +75,10 @@ export type PipeStreamOptions = {
* Additional request options for the API call.
*/
requestOptions?: ApiRequestOptions;
/** Override the default span name for this operation. */
spanName?: string;
/** When true, the span will be collapsed in the dashboard. */
collapsed?: boolean;
};

/**
Expand All @@ -89,7 +97,7 @@ export type PipeStreamResult<T> = {
* to the realtime stream. Use this to wait for the stream to complete before
* finishing your task.
*/
waitUntilComplete: () => Promise<void>;
waitUntilComplete: () => Promise<StreamWriteResult>;
};

/**
Expand Down Expand Up @@ -185,6 +193,14 @@ export type RealtimeDefinedInputStream<TData> = {
* Uses a waitpoint token internally. Can only be called inside a task.run().
*/
wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise<TData>;
/**
* Wait for data with an idle phase before suspending.
*
* Keeps the task active (using compute) for `idleTimeoutInSeconds`,
* then suspends via `.wait()` if no data arrives. If data arrives during
* the idle phase the task responds instantly without suspending.
*/
waitWithIdleTimeout: (options: InputStreamWaitWithIdleTimeoutOptions) => Promise<{ ok: true; output: TData } | { ok: false; error?: any }>;
/**
* Send data to this input stream on a specific run.
* This is used from outside the task (e.g., from your backend or another task).
Expand All @@ -199,6 +215,8 @@ export type InputStreamSubscription = {
export type InputStreamOnceOptions = {
signal?: AbortSignal;
timeoutMs?: number;
/** Override the default span name for this operation. */
spanName?: string;
};

export type SendInputStreamOptions = {
Expand Down Expand Up @@ -234,6 +252,18 @@ export type InputStreamWaitOptions = {
* and filtering waitpoints via `wait.listTokens()`.
*/
tags?: string[];

/** Override the default span name for this operation. */
spanName?: string;
};

export type InputStreamWaitWithIdleTimeoutOptions = {
/** Seconds to keep the task idle (active, using compute) before suspending. */
idleTimeoutInSeconds: number;
/** Maximum time to wait after suspending (duration string, e.g. "1h"). */
timeout?: string;
/** Override the default span name for the outer operation. */
spanName?: string;
};

export type InferInputStreamType<T> = T extends RealtimeDefinedInputStream<infer TData>
Expand Down
Loading
Loading