Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 132 additions & 21 deletions nodejs/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
StreamMessageReader,
StreamMessageWriter,
} from "vscode-jsonrpc/node.js";
import { createServerRpc } from "./generated/rpc.js";
import { createServerRpc, registerClientApiHandlers } from "./generated/rpc.js";
import type { SessionStoreHandler } from "./generated/rpc.js";
import { getSdkProtocolVersion } from "./sdkProtocolVersion.js";
import { CopilotSession, NO_RESULT_PERMISSION_V2_ERROR } from "./session.js";
import { getTraceContext } from "./telemetry.js";
Expand All @@ -33,6 +34,7 @@ import type {
ForegroundSessionInfo,
GetAuthStatusResponse,
GetStatusResponse,
ListSessionsOptions,
ModelInfo,
ResumeSessionConfig,
SessionConfig,
Expand All @@ -43,6 +45,7 @@ import type {
SessionLifecycleHandler,
SessionListFilter,
SessionMetadata,
SessionStoreConfig,
TelemetryConfig,
Tool,
ToolCallRequestPayload,
Expand Down Expand Up @@ -175,6 +178,14 @@ export class CopilotClient {
private _rpc: ReturnType<typeof createServerRpc> | null = null;
private processExitPromise: Promise<never> | null = null; // Rejects when CLI process exits
private negotiatedProtocolVersion: number | null = null;
/** Per-session store configs, keyed by sessionId. Used to route sessionStore.* RPC requests. */
private sessionStoreConfigs: Map<string, SessionStoreConfig> = new Map();
/**
* Stores used during listSessions() calls. Keyed by an incrementing ID to support concurrent calls.
* The latest registered store is used when the server calls sessionStore.list.
*/
private listSessionsStores: Map<number, SessionStoreConfig> = new Map();
private listSessionsStoreNextId = 0;

/**
* Typed server-scoped RPC methods.
Expand Down Expand Up @@ -397,6 +408,8 @@ export class CopilotClient {
}
}
this.sessions.clear();
this.sessionStoreConfigs.clear();
this.listSessionsStores.clear();

// Close connection
if (this.connection) {
Expand Down Expand Up @@ -481,6 +494,8 @@ export class CopilotClient {

// Clear sessions immediately without trying to destroy them
this.sessions.clear();
this.sessionStoreConfigs.clear();
this.listSessionsStores.clear();

// Force close connection
if (this.connection) {
Expand Down Expand Up @@ -588,6 +603,12 @@ export class CopilotClient {
}
this.sessions.set(sessionId, session);

// Register session store callbacks before the RPC so that
// store requests arriving during session creation are handled.
if (config.sessionStore) {
this.sessionStoreConfigs.set(sessionId, config.sessionStore);
}

try {
const response = await this.connection!.sendRequest("session.create", {
...(await getTraceContext(this.onGetTraceContext)),
Expand Down Expand Up @@ -619,6 +640,9 @@ export class CopilotClient {
skillDirectories: config.skillDirectories,
disabledSkills: config.disabledSkills,
infiniteSessions: config.infiniteSessions,
sessionStore: config.sessionStore
? { descriptor: config.sessionStore.descriptor }
: undefined,
});

const { workspacePath } = response as {
Expand All @@ -628,6 +652,7 @@ export class CopilotClient {
session["_workspacePath"] = workspacePath;
} catch (e) {
this.sessions.delete(sessionId);
this.sessionStoreConfigs.delete(sessionId);
throw e;
}

Expand Down Expand Up @@ -694,6 +719,11 @@ export class CopilotClient {
}
this.sessions.set(sessionId, session);

// Register session store callbacks before the RPC
if (config.sessionStore) {
this.sessionStoreConfigs.set(sessionId, config.sessionStore);
}

try {
const response = await this.connection!.sendRequest("session.resume", {
...(await getTraceContext(this.onGetTraceContext)),
Expand Down Expand Up @@ -726,6 +756,9 @@ export class CopilotClient {
disabledSkills: config.disabledSkills,
infiniteSessions: config.infiniteSessions,
disableResume: config.disableResume,
sessionStore: config.sessionStore
? { descriptor: config.sessionStore.descriptor }
: undefined,
});

const { workspacePath } = response as {
Expand All @@ -735,6 +768,7 @@ export class CopilotClient {
session["_workspacePath"] = workspacePath;
} catch (e) {
this.sessions.delete(sessionId);
this.sessionStoreConfigs.delete(sessionId);
throw e;
}

Expand Down Expand Up @@ -951,6 +985,7 @@ export class CopilotClient {

// Remove from local sessions map if present
this.sessions.delete(sessionId);
this.sessionStoreConfigs.delete(sessionId);
}

/**
Expand All @@ -966,31 +1001,57 @@ export class CopilotClient {
* // List sessions for a specific repository
* const sessions = await client.listSessions({ repository: "owner/repo" });
*/
async listSessions(filter?: SessionListFilter): Promise<SessionMetadata[]> {
async listSessions(filterOrOptions?: SessionListFilter | ListSessionsOptions): Promise<SessionMetadata[]> {
if (!this.connection) {
throw new Error("Client not connected");
}

const response = await this.connection.sendRequest("session.list", { filter });
const { sessions } = response as {
sessions: Array<{
sessionId: string;
startTime: string;
modifiedTime: string;
summary?: string;
isRemote: boolean;
context?: SessionContext;
}>;
};
// Support both plain filter and full options object
const options: ListSessionsOptions | undefined =
filterOrOptions && "sessionStore" in filterOrOptions
? (filterOrOptions as ListSessionsOptions)
: filterOrOptions
? { filter: filterOrOptions as SessionListFilter }
: undefined;

return sessions.map((s) => ({
sessionId: s.sessionId,
startTime: new Date(s.startTime),
modifiedTime: new Date(s.modifiedTime),
summary: s.summary,
isRemote: s.isRemote,
context: s.context,
}));
// Register the store for the duration of this RPC call (supports concurrent calls)
let storeId: number | undefined;
if (options?.sessionStore) {
storeId = this.listSessionsStoreNextId++;
this.listSessionsStores.set(storeId, options.sessionStore);
}

try {
const response = await this.connection.sendRequest("session.list", {
filter: options?.filter,
sessionStore: options?.sessionStore
? { descriptor: options.sessionStore.descriptor }
: undefined,
});
const { sessions } = response as {
sessions: Array<{
sessionId: string;
startTime: string;
modifiedTime: string;
summary?: string;
isRemote: boolean;
context?: SessionContext;
}>;
};

return sessions.map((s) => ({
sessionId: s.sessionId,
startTime: new Date(s.startTime),
modifiedTime: new Date(s.modifiedTime),
summary: s.summary,
isRemote: s.isRemote,
context: s.context,
}));
} finally {
if (storeId !== undefined) {
this.listSessionsStores.delete(storeId);
}
}
}

/**
Expand Down Expand Up @@ -1455,6 +1516,12 @@ export class CopilotClient {
}): Promise<{ output?: unknown }> => await this.handleHooksInvoke(params)
);

// Register session store RPC handlers via generated registration function.
// The handler routes each call to the appropriate SessionStoreConfig based on sessionId.
registerClientApiHandlers(this.connection, {
sessionStore: this.createSessionStoreHandler(),
});

this.connection.onClose(() => {
this.state = "disconnected";
});
Expand All @@ -1464,6 +1531,50 @@ export class CopilotClient {
});
}

/**
* Create a {@link SessionStoreHandler} that routes each RPC call to the
* appropriate {@link SessionStoreConfig} registered for the given session.
*/
private createSessionStoreHandler(): SessionStoreHandler {
const getStore = (sessionId: string): SessionStoreConfig => {
const store = this.sessionStoreConfigs.get(sessionId);
if (!store) {
throw new Error(`No session store registered for session ${sessionId}`);
}
return store;
};

return {
load: async (params) => {
const store = getStore(params.sessionId);
const events = await store.onLoad(params.sessionId);
// Events are opaque on the wire but typed in the SDK consumer API
return { events: events as Record<string, unknown>[] };
},
append: async (params) => {
const store = getStore(params.sessionId);
await store.onAppend(params.events as SessionEvent[], params.sessionId);
},
truncate: async (params) => {
const store = getStore(params.sessionId);
return store.onTruncate(params.upToEventId, params.sessionId);
},
list: async () => {
// Use the most recently registered store for listing.
const store = Array.from(this.listSessionsStores.values()).pop();
if (!store) {
throw new Error("No session store registered for listing");
}
const sessions = await store.onListSessions();
return { sessions };
},
delete: async (params) => {
const store = getStore(params.sessionId);
await store.onDelete(params.sessionId);
},
};
}

private handleSessionEventNotification(notification: unknown): void {
if (
typeof notification !== "object" ||
Expand Down
Loading
Loading