diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index a3349dfe603..2608a58d090 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -14,6 +14,7 @@ import { requestChatTitle, SSE_RESPONSE_HEADERS, } from '@/lib/copilot/chat-streaming' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer' @@ -182,25 +183,36 @@ export async function POST(req: NextRequest) { const wf = await getWorkflowById(workflowId) resolvedWorkspaceId = wf?.workspaceId ?? undefined } catch { - logger.warn(`[${tracker.requestId}] Failed to resolve workspaceId from workflow`) + logger.warn( + appendCopilotLogContext('Failed to resolve workspaceId from workflow', { + requestId: tracker.requestId, + messageId: userMessageId, + }) + ) } const userMessageIdToUse = userMessageId || crypto.randomUUID() try { - logger.info(`[${tracker.requestId}] Received chat POST`, { - workflowId, - hasContexts: Array.isArray(normalizedContexts), - contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0, - contextsPreview: Array.isArray(normalizedContexts) - ? normalizedContexts.map((c: any) => ({ - kind: c?.kind, - chatId: c?.chatId, - workflowId: c?.workflowId, - executionId: (c as any)?.executionId, - label: c?.label, - })) - : undefined, - }) + logger.info( + appendCopilotLogContext('Received chat POST', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + { + workflowId, + hasContexts: Array.isArray(normalizedContexts), + contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0, + contextsPreview: Array.isArray(normalizedContexts) + ? normalizedContexts.map((c: any) => ({ + kind: c?.kind, + chatId: c?.chatId, + workflowId: c?.workflowId, + executionId: (c as any)?.executionId, + label: c?.label, + })) + : undefined, + } + ) } catch {} let currentChat: any = null @@ -238,22 +250,40 @@ export async function POST(req: NextRequest) { actualChatId ) agentContexts = processed - logger.info(`[${tracker.requestId}] Contexts processed for request`, { - processedCount: agentContexts.length, - kinds: agentContexts.map((c) => c.type), - lengthPreview: agentContexts.map((c) => c.content?.length ?? 0), - }) + logger.info( + appendCopilotLogContext('Contexts processed for request', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + { + processedCount: agentContexts.length, + kinds: agentContexts.map((c) => c.type), + lengthPreview: agentContexts.map((c) => c.content?.length ?? 0), + } + ) if ( Array.isArray(normalizedContexts) && normalizedContexts.length > 0 && agentContexts.length === 0 ) { logger.warn( - `[${tracker.requestId}] Contexts provided but none processed. Check executionId for logs contexts.` + appendCopilotLogContext( + 'Contexts provided but none processed. Check executionId for logs contexts.', + { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + } + ) ) } } catch (e) { - logger.error(`[${tracker.requestId}] Failed to process contexts`, e) + logger.error( + appendCopilotLogContext('Failed to process contexts', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + e + ) } } @@ -283,7 +313,10 @@ export async function POST(req: NextRequest) { agentContexts.push(result.value) } else if (result.status === 'rejected') { logger.error( - `[${tracker.requestId}] Failed to resolve resource attachment`, + appendCopilotLogContext('Failed to resolve resource attachment', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), result.reason ) } @@ -324,20 +357,26 @@ export async function POST(req: NextRequest) { ) try { - logger.info(`[${tracker.requestId}] About to call Sim Agent`, { - hasContext: agentContexts.length > 0, - contextCount: agentContexts.length, - hasFileAttachments: Array.isArray(requestPayload.fileAttachments), - messageLength: message.length, - mode: effectiveMode, - hasTools: Array.isArray(requestPayload.tools), - toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0, - hasBaseTools: Array.isArray(requestPayload.baseTools), - baseToolCount: Array.isArray(requestPayload.baseTools) - ? requestPayload.baseTools.length - : 0, - hasCredentials: !!requestPayload.credentials, - }) + logger.info( + appendCopilotLogContext('About to call Sim Agent', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + { + hasContext: agentContexts.length > 0, + contextCount: agentContexts.length, + hasFileAttachments: Array.isArray(requestPayload.fileAttachments), + messageLength: message.length, + mode: effectiveMode, + hasTools: Array.isArray(requestPayload.tools), + toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0, + hasBaseTools: Array.isArray(requestPayload.baseTools), + baseToolCount: Array.isArray(requestPayload.baseTools) + ? requestPayload.baseTools.length + : 0, + hasCredentials: !!requestPayload.credentials, + } + ) } catch {} if (stream && actualChatId) { @@ -481,10 +520,16 @@ export async function POST(req: NextRequest) { .where(eq(copilotChats.id, actualChatId)) } } catch (error) { - logger.error(`[${tracker.requestId}] Failed to persist chat messages`, { - chatId: actualChatId, - error: error instanceof Error ? error.message : 'Unknown error', - }) + logger.error( + appendCopilotLogContext('Failed to persist chat messages', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + { + chatId: actualChatId, + error: error instanceof Error ? error.message : 'Unknown error', + } + ) } }, }, @@ -510,13 +555,19 @@ export async function POST(req: NextRequest) { provider: typeof requestPayload?.provider === 'string' ? requestPayload.provider : undefined, } - logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, { - hasContent: !!responseData.content, - contentLength: responseData.content?.length || 0, - model: responseData.model, - provider: responseData.provider, - toolCallsCount: responseData.toolCalls?.length || 0, - }) + logger.info( + appendCopilotLogContext('Non-streaming response from orchestrator', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + { + hasContent: !!responseData.content, + contentLength: responseData.content?.length || 0, + model: responseData.model, + provider: responseData.provider, + toolCallsCount: responseData.toolCalls?.length || 0, + } + ) // Save messages if we have a chat if (currentChat && responseData.content) { @@ -549,8 +600,13 @@ export async function POST(req: NextRequest) { // Start title generation in parallel if this is first message (non-streaming) if (actualChatId && !currentChat.title && conversationHistory.length === 0) { - logger.info(`[${tracker.requestId}] Starting title generation for non-streaming response`) - requestChatTitle({ message, model: selectedModel, provider }) + logger.info( + appendCopilotLogContext('Starting title generation for non-streaming response', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }) + ) + requestChatTitle({ message, model: selectedModel, provider, messageId: userMessageIdToUse }) .then(async (title) => { if (title) { await db @@ -560,11 +616,22 @@ export async function POST(req: NextRequest) { updatedAt: new Date(), }) .where(eq(copilotChats.id, actualChatId!)) - logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`) + logger.info( + appendCopilotLogContext(`Generated and saved title: ${title}`, { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }) + ) } }) .catch((error) => { - logger.error(`[${tracker.requestId}] Title generation failed:`, error) + logger.error( + appendCopilotLogContext('Title generation failed', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + error + ) }) } @@ -578,11 +645,17 @@ export async function POST(req: NextRequest) { .where(eq(copilotChats.id, actualChatId!)) } - logger.info(`[${tracker.requestId}] Returning non-streaming response`, { - duration: tracker.getDuration(), - chatId: actualChatId, - responseLength: responseData.content?.length || 0, - }) + logger.info( + appendCopilotLogContext('Returning non-streaming response', { + requestId: tracker.requestId, + messageId: userMessageIdToUse, + }), + { + duration: tracker.getDuration(), + chatId: actualChatId, + responseLength: responseData.content?.length || 0, + } + ) return NextResponse.json({ success: true, @@ -606,21 +679,33 @@ export async function POST(req: NextRequest) { const duration = tracker.getDuration() if (error instanceof z.ZodError) { - logger.error(`[${tracker.requestId}] Validation error:`, { - duration, - errors: error.errors, - }) + logger.error( + appendCopilotLogContext('Validation error', { + requestId: tracker.requestId, + messageId: pendingChatStreamID ?? undefined, + }), + { + duration, + errors: error.errors, + } + ) return NextResponse.json( { error: 'Invalid request data', details: error.errors }, { status: 400 } ) } - logger.error(`[${tracker.requestId}] Error handling copilot chat:`, { - duration, - error: error instanceof Error ? error.message : 'Unknown error', - stack: error instanceof Error ? error.stack : undefined, - }) + logger.error( + appendCopilotLogContext('Error handling copilot chat', { + requestId: tracker.requestId, + messageId: pendingChatStreamID ?? undefined, + }), + { + duration, + error: error instanceof Error ? error.message : 'Unknown error', + stack: error instanceof Error ? error.stack : undefined, + } + ) return NextResponse.json( { error: error instanceof Error ? error.message : 'Internal server error' }, @@ -665,11 +750,16 @@ export async function GET(req: NextRequest) { status: meta?.status || 'unknown', } } catch (err) { - logger.warn('Failed to read stream snapshot for chat', { - chatId, - conversationId: chat.conversationId, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to read stream snapshot for chat', { + messageId: chat.conversationId || undefined, + }), + { + chatId, + conversationId: chat.conversationId, + error: err instanceof Error ? err.message : String(err), + } + ) } } @@ -688,7 +778,11 @@ export async function GET(req: NextRequest) { ...(streamSnapshot ? { streamSnapshot } : {}), } - logger.info(`Retrieved chat ${chatId}`) + logger.info( + appendCopilotLogContext(`Retrieved chat ${chatId}`, { + messageId: chat.conversationId || undefined, + }) + ) return NextResponse.json({ success: true, chat: transformedChat }) } @@ -750,7 +844,7 @@ export async function GET(req: NextRequest) { chats: transformedChats, }) } catch (error) { - logger.error('Error fetching copilot chats:', error) + logger.error('Error fetching copilot chats', error) return createInternalServerErrorResponse('Failed to fetch chats') } } diff --git a/apps/sim/app/api/copilot/chat/stream/route.ts b/apps/sim/app/api/copilot/chat/stream/route.ts index 3234b98d085..851fb642034 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.ts @@ -1,5 +1,6 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { getStreamMeta, readStreamEvents, @@ -35,12 +36,24 @@ export async function GET(request: NextRequest) { const toParam = url.searchParams.get('to') const toEventId = toParam ? Number(toParam) : undefined + logger.info( + appendCopilotLogContext('[Resume] Received resume request', { + messageId: streamId || undefined, + }), + { + streamId: streamId || undefined, + fromEventId, + toEventId, + batchMode, + } + ) + if (!streamId) { return NextResponse.json({ error: 'streamId is required' }, { status: 400 }) } const meta = (await getStreamMeta(streamId)) as StreamMeta | null - logger.info('[Resume] Stream lookup', { + logger.info(appendCopilotLogContext('[Resume] Stream lookup', { messageId: streamId }), { streamId, fromEventId, toEventId, @@ -59,7 +72,7 @@ export async function GET(request: NextRequest) { if (batchMode) { const events = await readStreamEvents(streamId, fromEventId) const filteredEvents = toEventId ? events.filter((e) => e.eventId <= toEventId) : events - logger.info('[Resume] Batch response', { + logger.info(appendCopilotLogContext('[Resume] Batch response', { messageId: streamId }), { streamId, fromEventId, toEventId, @@ -111,11 +124,14 @@ export async function GET(request: NextRequest) { const flushEvents = async () => { const events = await readStreamEvents(streamId, lastEventId) if (events.length > 0) { - logger.info('[Resume] Flushing events', { - streamId, - fromEventId: lastEventId, - eventCount: events.length, - }) + logger.info( + appendCopilotLogContext('[Resume] Flushing events', { messageId: streamId }), + { + streamId, + fromEventId: lastEventId, + eventCount: events.length, + } + ) } for (const entry of events) { lastEventId = entry.eventId @@ -162,7 +178,7 @@ export async function GET(request: NextRequest) { } } catch (error) { if (!controllerClosed && !request.signal.aborted) { - logger.warn('Stream replay failed', { + logger.warn(appendCopilotLogContext('Stream replay failed', { messageId: streamId }), { streamId, error: error instanceof Error ? error.message : String(error), }) diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index ba262842cf1..c5fdddba6fb 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -12,6 +12,7 @@ import { createSSEStream, SSE_RESPONSE_HEADERS, } from '@/lib/copilot/chat-streaming' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents' import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers' @@ -87,6 +88,7 @@ const MothershipMessageSchema = z.object({ */ export async function POST(req: NextRequest) { const tracker = createRequestTracker() + let userMessageIdForLogs: string | undefined try { const session = await getSession() @@ -109,6 +111,28 @@ export async function POST(req: NextRequest) { } = MothershipMessageSchema.parse(body) const userMessageId = providedMessageId || crypto.randomUUID() + userMessageIdForLogs = userMessageId + + logger.info( + appendCopilotLogContext('Received mothership chat start request', { + requestId: tracker.requestId, + messageId: userMessageId, + }), + { + workspaceId, + chatId, + createNewChat, + hasContexts: Array.isArray(contexts) && contexts.length > 0, + contextsCount: Array.isArray(contexts) ? contexts.length : 0, + hasResourceAttachments: + Array.isArray(resourceAttachments) && resourceAttachments.length > 0, + resourceAttachmentCount: Array.isArray(resourceAttachments) + ? resourceAttachments.length + : 0, + hasFileAttachments: Array.isArray(fileAttachments) && fileAttachments.length > 0, + fileAttachmentCount: Array.isArray(fileAttachments) ? fileAttachments.length : 0, + } + ) try { await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId) @@ -150,7 +174,13 @@ export async function POST(req: NextRequest) { actualChatId ) } catch (e) { - logger.error(`[${tracker.requestId}] Failed to process contexts`, e) + logger.error( + appendCopilotLogContext('Failed to process contexts', { + requestId: tracker.requestId, + messageId: userMessageId, + }), + e + ) } } @@ -176,7 +206,10 @@ export async function POST(req: NextRequest) { agentContexts.push(result.value) } else if (result.status === 'rejected') { logger.error( - `[${tracker.requestId}] Failed to resolve resource attachment`, + appendCopilotLogContext('Failed to resolve resource attachment', { + requestId: tracker.requestId, + messageId: userMessageId, + }), result.reason ) } @@ -366,10 +399,16 @@ export async function POST(req: NextRequest) { }) } } catch (error) { - logger.error(`[${tracker.requestId}] Failed to persist chat messages`, { - chatId: actualChatId, - error: error instanceof Error ? error.message : 'Unknown error', - }) + logger.error( + appendCopilotLogContext('Failed to persist chat messages', { + requestId: tracker.requestId, + messageId: userMessageId, + }), + { + chatId: actualChatId, + error: error instanceof Error ? error.message : 'Unknown error', + } + ) } }, }, @@ -384,9 +423,15 @@ export async function POST(req: NextRequest) { ) } - logger.error(`[${tracker.requestId}] Error handling mothership chat:`, { - error: error instanceof Error ? error.message : 'Unknown error', - }) + logger.error( + appendCopilotLogContext('Error handling mothership chat', { + requestId: tracker.requestId, + messageId: userMessageIdForLogs, + }), + { + error: error instanceof Error ? error.message : 'Unknown error', + } + ) return NextResponse.json( { error: error instanceof Error ? error.message : 'Internal server error' }, diff --git a/apps/sim/app/api/mothership/execute/route.ts b/apps/sim/app/api/mothership/execute/route.ts index 6ec8bc33e24..1632b028d12 100644 --- a/apps/sim/app/api/mothership/execute/route.ts +++ b/apps/sim/app/api/mothership/execute/route.ts @@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { checkInternalAuth } from '@/lib/auth/hybrid' import { buildIntegrationToolSchemas } from '@/lib/copilot/chat-payload' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { generateWorkspaceContext } from '@/lib/copilot/workspace-context' import { @@ -35,6 +36,8 @@ const ExecuteRequestSchema = z.object({ * Consumes the Go SSE stream internally and returns a single JSON response. */ export async function POST(req: NextRequest) { + let messageId: string | undefined + try { const auth = await checkInternalAuth(req, { requireWorkflowId: false }) if (!auth.success) { @@ -48,9 +51,10 @@ export async function POST(req: NextRequest) { await assertActiveWorkspaceAccess(workspaceId, userId) const effectiveChatId = chatId || crypto.randomUUID() + messageId = crypto.randomUUID() const [workspaceContext, integrationTools, userPermission] = await Promise.all([ generateWorkspaceContext(workspaceId, userId), - buildIntegrationToolSchemas(userId), + buildIntegrationToolSchemas(userId, messageId), getUserEntityPermissions(userId, 'workspace', workspaceId).catch(() => null), ]) @@ -60,7 +64,7 @@ export async function POST(req: NextRequest) { userId, chatId: effectiveChatId, mode: 'agent', - messageId: crypto.randomUUID(), + messageId, isHosted: true, workspaceContext, ...(integrationTools.length > 0 ? { integrationTools } : {}), @@ -77,7 +81,7 @@ export async function POST(req: NextRequest) { }) if (!result.success) { - logger.error('Mothership execute failed', { + logger.error(appendCopilotLogContext('Mothership execute failed', { messageId }), { error: result.error, errors: result.errors, }) @@ -116,7 +120,7 @@ export async function POST(req: NextRequest) { ) } - logger.error('Mothership execute error', { + logger.error(appendCopilotLogContext('Mothership execute error', { messageId }), { error: error instanceof Error ? error.message : 'Unknown error', }) diff --git a/apps/sim/app/api/v1/copilot/chat/route.ts b/apps/sim/app/api/v1/copilot/chat/route.ts index 2c60a0a9a22..9a2912633b9 100644 --- a/apps/sim/app/api/v1/copilot/chat/route.ts +++ b/apps/sim/app/api/v1/copilot/chat/route.ts @@ -1,6 +1,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils' @@ -32,6 +33,7 @@ const RequestSchema = z.object({ * - The copilot can still operate on any workflow using list_user_workflows */ export async function POST(req: NextRequest) { + let messageId: string | undefined const auth = await authenticateV1Request(req) if (!auth.authenticated || !auth.userId) { return NextResponse.json( @@ -80,13 +82,25 @@ export async function POST(req: NextRequest) { // Always generate a chatId - required for artifacts system to work with subagents const chatId = parsed.chatId || crypto.randomUUID() + messageId = crypto.randomUUID() + logger.info( + appendCopilotLogContext('Received headless copilot chat start request', { messageId }), + { + workflowId: resolved.workflowId, + workflowName: parsed.workflowName, + chatId, + mode: transportMode, + autoExecuteTools: parsed.autoExecuteTools, + timeout: parsed.timeout, + } + ) const requestPayload = { message: parsed.message, workflowId: resolved.workflowId, userId: auth.userId, model: selectedModel, mode: transportMode, - messageId: crypto.randomUUID(), + messageId, chatId, } @@ -115,7 +129,7 @@ export async function POST(req: NextRequest) { ) } - logger.error('Headless copilot request failed', { + logger.error(appendCopilotLogContext('Headless copilot request failed', { messageId }), { error: error instanceof Error ? error.message : String(error), }) return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 }) diff --git a/apps/sim/lib/copilot/chat-payload.ts b/apps/sim/lib/copilot/chat-payload.ts index 3c0bd22e5f3..783bb1e1eee 100644 --- a/apps/sim/lib/copilot/chat-payload.ts +++ b/apps/sim/lib/copilot/chat-payload.ts @@ -1,5 +1,6 @@ import { createLogger } from '@sim/logger' import { getUserSubscriptionState } from '@/lib/billing/core/subscription' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { getCopilotToolDescription } from '@/lib/copilot/tool-descriptions' import { isHosted } from '@/lib/core/config/feature-flags' import { createMcpToolId } from '@/lib/mcp/utils' @@ -45,7 +46,10 @@ export interface ToolSchema { * Shared by the interactive chat payload builder and the non-interactive * block execution route so both paths send the same tool definitions to Go. */ -export async function buildIntegrationToolSchemas(userId: string): Promise { +export async function buildIntegrationToolSchemas( + userId: string, + messageId?: string +): Promise { const integrationTools: ToolSchema[] = [] try { const { createUserToolSchema } = await import('@/tools/params') @@ -56,10 +60,15 @@ export async function buildIntegrationToolSchemas(userId: string): Promise 0) { - logger.info('Added MCP tools to copilot payload', { count: mcpTools.length }) + logger.info( + appendCopilotLogContext('Added MCP tools to copilot payload', { + messageId: userMessageId, + }), + { count: mcpTools.length } + ) } } } catch (error) { - logger.warn('Failed to discover MCP tools for copilot', { - error: error instanceof Error ? error.message : String(error), - }) + logger.warn( + appendCopilotLogContext('Failed to discover MCP tools for copilot', { + messageId: userMessageId, + }), + { + error: error instanceof Error ? error.message : String(error), + } + ) } } } diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index dc0ad0a698c..0d090a892d3 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { createRunSegment, updateRunStatus } from '@/lib/copilot/async-runs/repository' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import type { OrchestrateStreamOptions } from '@/lib/copilot/orchestrator' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' import { @@ -205,8 +206,9 @@ export async function requestChatTitle(params: { message: string model: string provider?: string + messageId?: string }): Promise { - const { message, model, provider } = params + const { message, model, provider, messageId } = params if (!message || !model) return null const headers: Record = { 'Content-Type': 'application/json' } @@ -223,17 +225,20 @@ export async function requestChatTitle(params: { const payload = await response.json().catch(() => ({})) if (!response.ok) { - logger.warn('Failed to generate chat title via copilot backend', { - status: response.status, - error: payload, - }) + logger.warn( + appendCopilotLogContext('Failed to generate chat title via copilot backend', { messageId }), + { + status: response.status, + error: payload, + } + ) return null } const title = typeof payload?.title === 'string' ? payload.title.trim() : '' return title || null } catch (error) { - logger.error('Error generating chat title:', error) + logger.error(appendCopilotLogContext('Error generating chat title', { messageId }), error) return null } } @@ -274,6 +279,8 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS orchestrateOptions, pendingChatStreamAlreadyRegistered = false, } = params + const messageId = + typeof requestPayload.messageId === 'string' ? requestPayload.messageId : streamId let eventWriter: ReturnType | null = null let clientDisconnected = false @@ -303,9 +310,15 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS provider: (requestPayload.provider as string | undefined) || null, requestContext: { requestId }, }).catch((error) => { - logger.warn(`[${requestId}] Failed to create copilot run segment`, { - error: error instanceof Error ? error.message : String(error), - }) + logger.warn( + appendCopilotLogContext('Failed to create copilot run segment', { + requestId, + messageId, + }), + { + error: error instanceof Error ? error.message : String(error), + } + ) }) } eventWriter = createStreamEventWriter(streamId) @@ -324,10 +337,16 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await redis.del(getStreamAbortKey(streamId)) } } catch (error) { - logger.warn(`[${requestId}] Failed to poll distributed stream abort`, { - streamId, - error: error instanceof Error ? error.message : String(error), - }) + logger.warn( + appendCopilotLogContext('Failed to poll distributed stream abort', { + requestId, + messageId, + }), + { + streamId, + error: error instanceof Error ? error.message : String(error), + } + ) } })() }, STREAM_ABORT_POLL_MS) @@ -344,11 +363,14 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await eventWriter.flush() } } catch (error) { - logger.error(`[${requestId}] Failed to persist stream event`, { - eventType: event.type, - eventId, - error: error instanceof Error ? error.message : String(error), - }) + logger.error( + appendCopilotLogContext('Failed to persist stream event', { requestId, messageId }), + { + eventType: event.type, + eventId, + error: error instanceof Error ? error.message : String(error), + } + ) // Keep the live SSE stream going even if durable buffering hiccups. } @@ -367,7 +389,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS try { await pushEvent(event) } catch (error) { - logger.error(`[${requestId}] Failed to push event`, { + logger.error(appendCopilotLogContext('Failed to push event', { requestId, messageId }), { eventType: event.type, error: error instanceof Error ? error.message : String(error), }) @@ -379,7 +401,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } if (chatId && !currentChat?.title && isNewChat) { - requestChatTitle({ message, model: titleModel, provider: titleProvider }) + requestChatTitle({ message, model: titleModel, provider: titleProvider, messageId }) .then(async (title) => { if (title) { await db.update(copilotChats).set({ title }).where(eq(copilotChats.id, chatId!)) @@ -390,7 +412,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS } }) .catch((error) => { - logger.error(`[${requestId}] Title generation failed:`, error) + logger.error( + appendCopilotLogContext('Title generation failed', { requestId, messageId }), + error + ) }) } @@ -415,7 +440,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS }) if (abortController.signal.aborted) { - logger.info(`[${requestId}] Stream aborted by explicit stop`) + logger.info( + appendCopilotLogContext('Stream aborted by explicit stop', { requestId, messageId }) + ) await eventWriter.close().catch(() => {}) await setStreamMeta(streamId, { status: 'cancelled', userId, executionId, runId }) await updateRunStatus(runId, 'cancelled', { completedAt: new Date() }).catch(() => {}) @@ -429,14 +456,23 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS 'An unexpected error occurred while processing the response.' if (clientDisconnected) { - logger.info(`[${requestId}] Stream failed after client disconnect`, { - error: errorMessage, - }) + logger.info( + appendCopilotLogContext('Stream failed after client disconnect', { + requestId, + messageId, + }), + { + error: errorMessage, + } + ) } - logger.error(`[${requestId}] Orchestration returned failure`, { - error: errorMessage, - }) + logger.error( + appendCopilotLogContext('Orchestration returned failure', { requestId, messageId }), + { + error: errorMessage, + } + ) await pushEventBestEffort({ type: 'error', error: errorMessage, @@ -464,18 +500,29 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await updateRunStatus(runId, 'complete', { completedAt: new Date() }).catch(() => {}) } catch (error) { if (abortController.signal.aborted) { - logger.info(`[${requestId}] Stream aborted by explicit stop`) + logger.info( + appendCopilotLogContext('Stream aborted by explicit stop', { requestId, messageId }) + ) await eventWriter.close().catch(() => {}) await setStreamMeta(streamId, { status: 'cancelled', userId, executionId, runId }) await updateRunStatus(runId, 'cancelled', { completedAt: new Date() }).catch(() => {}) return } if (clientDisconnected) { - logger.info(`[${requestId}] Stream errored after client disconnect`, { - error: error instanceof Error ? error.message : 'Stream error', - }) + logger.info( + appendCopilotLogContext('Stream errored after client disconnect', { + requestId, + messageId, + }), + { + error: error instanceof Error ? error.message : 'Stream error', + } + ) } - logger.error(`[${requestId}] Orchestration error:`, error) + logger.error( + appendCopilotLogContext('Orchestration error', { requestId, messageId }), + error + ) const errorMessage = error instanceof Error ? error.message : 'Stream error' await pushEventBestEffort({ type: 'error', diff --git a/apps/sim/lib/copilot/logging.ts b/apps/sim/lib/copilot/logging.ts new file mode 100644 index 00000000000..b1f0aa90435 --- /dev/null +++ b/apps/sim/lib/copilot/logging.ts @@ -0,0 +1,25 @@ +export interface CopilotLogContext { + requestId?: string + messageId?: string +} + +/** + * Appends copilot request identifiers to a log message. + */ +export function appendCopilotLogContext(message: string, context: CopilotLogContext = {}): string { + const suffixParts: string[] = [] + + if (context.requestId) { + suffixParts.push(`requestId:${context.requestId}`) + } + + if (context.messageId) { + suffixParts.push(`messageId:${context.messageId}`) + } + + if (suffixParts.length === 0) { + return message + } + + return `${message} [${suffixParts.join(' ')}]` +} diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index 3320b0df33d..8861b23e47c 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -14,6 +14,7 @@ import { updateRunStatus, } from '@/lib/copilot/async-runs/repository' import { SIM_AGENT_API_URL, SIM_AGENT_VERSION } from '@/lib/copilot/constants' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { isToolAvailableOnSimSide, prepareExecutionContext, @@ -117,15 +118,32 @@ export async function orchestrateCopilotStream( execContext.abortSignal = options.abortSignal const payloadMsgId = requestPayload?.messageId + const messageId = typeof payloadMsgId === 'string' ? payloadMsgId : crypto.randomUUID() + execContext.messageId = messageId const context = createStreamingContext({ chatId, executionId, runId, - messageId: typeof payloadMsgId === 'string' ? payloadMsgId : crypto.randomUUID(), + messageId, }) + const withLogContext = (message: string) => + appendCopilotLogContext(message, { + requestId: context.requestId, + messageId, + }) let claimedToolCallIds: string[] = [] let claimedByWorkerId: string | null = null + logger.info(withLogContext('Starting copilot orchestration'), { + goRoute, + workflowId, + workspaceId, + chatId, + executionId, + runId, + hasUserTimezone: Boolean(userTimezone), + }) + try { let route = goRoute let payload = requestPayload @@ -135,6 +153,12 @@ export async function orchestrateCopilotStream( for (;;) { context.streamComplete = false + logger.info(withLogContext('Starting orchestration loop iteration'), { + route, + hasPendingAsyncContinuation: Boolean(context.awaitingAsyncContinuation), + claimedToolCallCount: claimedToolCallIds.length, + }) + const loopOptions = { ...options, onEvent: async (event: SSEEvent) => { @@ -142,6 +166,14 @@ export async function orchestrateCopilotStream( const d = (event.data ?? {}) as Record const response = (d.response ?? {}) as Record if (response.async_pause) { + logger.info(withLogContext('Detected async pause from copilot backend'), { + route, + checkpointId: + typeof (response.async_pause as Record)?.checkpointId === + 'string' + ? (response.async_pause as Record).checkpointId + : undefined, + }) if (runId) { await updateRunStatus(runId, 'paused_waiting_for_tool').catch(() => {}) } @@ -167,8 +199,18 @@ export async function orchestrateCopilotStream( loopOptions ) + logger.info(withLogContext('Completed orchestration loop iteration'), { + route, + streamComplete: context.streamComplete, + wasAborted: context.wasAborted, + hasAsyncContinuation: Boolean(context.awaitingAsyncContinuation), + errorCount: context.errors.length, + }) + if (claimedToolCallIds.length > 0) { - logger.info('Marking async tool calls as delivered', { toolCallIds: claimedToolCallIds }) + logger.info(withLogContext('Marking async tool calls as delivered'), { + toolCallIds: claimedToolCallIds, + }) await Promise.all( claimedToolCallIds.map((toolCallId) => markAsyncToolDelivered(toolCallId).catch(() => null) @@ -179,6 +221,11 @@ export async function orchestrateCopilotStream( } if (options.abortSignal?.aborted || context.wasAborted) { + logger.info(withLogContext('Stopping orchestration because request was aborted'), { + pendingToolCallCount: Array.from(context.toolCalls.values()).filter( + (toolCall) => toolCall.status === 'pending' || toolCall.status === 'executing' + ).length, + }) for (const [toolCallId, toolCall] of context.toolCalls) { if (toolCall.status === 'pending' || toolCall.status === 'executing') { toolCall.status = 'cancelled' @@ -191,10 +238,18 @@ export async function orchestrateCopilotStream( } const continuation = context.awaitingAsyncContinuation - if (!continuation) break + if (!continuation) { + logger.info(withLogContext('No async continuation pending; finishing orchestration')) + break + } let resumeReady = false let resumeRetries = 0 + logger.info(withLogContext('Processing async continuation'), { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + pendingToolCallIds: continuation.pendingToolCallIds, + }) for (;;) { claimedToolCallIds = [] claimedByWorkerId = null @@ -210,21 +265,31 @@ export async function orchestrateCopilotStream( if (localPendingPromise) { localPendingPromises.push(localPendingPromise) - logger.info('Waiting for local async tool completion before retrying resume claim', { - toolCallId, - runId: continuation.runId, - }) + logger.info( + withLogContext( + 'Waiting for local async tool completion before retrying resume claim' + ), + { + toolCallId, + runId: continuation.runId, + } + ) continue } if (durableRow && isTerminalAsyncStatus(durableRow.status)) { if (durableRow.claimedBy && durableRow.claimedBy !== resumeWorkerId) { missingToolCallIds.push(toolCallId) - logger.warn('Async tool continuation is waiting on a claim held by another worker', { - toolCallId, - runId: continuation.runId, - claimedBy: durableRow.claimedBy, - }) + logger.warn( + withLogContext( + 'Async tool continuation is waiting on a claim held by another worker' + ), + { + toolCallId, + runId: continuation.runId, + claimedBy: durableRow.claimedBy, + } + ) continue } readyTools.push({ @@ -243,12 +308,15 @@ export async function orchestrateCopilotStream( isTerminalToolCallStatus(toolState.status) && !isToolAvailableOnSimSide(toolState.name) ) { - logger.info('Including Go-handled tool in resume payload (no Sim-side row)', { - toolCallId, - toolName: toolState.name, - status: toolState.status, - runId: continuation.runId, - }) + logger.info( + withLogContext('Including Go-handled tool in resume payload (no Sim-side row)'), + { + toolCallId, + toolName: toolState.name, + status: toolState.status, + runId: continuation.runId, + } + ) readyTools.push({ toolCallId, toolState, @@ -258,7 +326,7 @@ export async function orchestrateCopilotStream( continue } - logger.warn('Skipping already-claimed or missing async tool resume', { + logger.warn(withLogContext('Skipping already-claimed or missing async tool resume'), { toolCallId, runId: continuation.runId, durableStatus: durableRow?.status, @@ -268,6 +336,13 @@ export async function orchestrateCopilotStream( } if (localPendingPromises.length > 0) { + logger.info( + withLogContext('Waiting for local pending async tools before resuming continuation'), + { + checkpointId: continuation.checkpointId, + pendingPromiseCount: localPendingPromises.length, + } + ) await Promise.allSettled(localPendingPromises) continue } @@ -275,15 +350,28 @@ export async function orchestrateCopilotStream( if (missingToolCallIds.length > 0) { if (resumeRetries < 3) { resumeRetries++ - logger.info('Retrying async resume after some tool calls were not yet ready', { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - retry: resumeRetries, - missingToolCallIds, - }) + logger.info( + withLogContext('Retrying async resume after some tool calls were not yet ready'), + { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + retry: resumeRetries, + missingToolCallIds, + } + ) await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } + logger.error( + withLogContext( + 'Async continuation failed because pending tool calls never became ready' + ), + { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + missingToolCallIds, + } + ) throw new Error( `Failed to resume async tool continuation: pending tool calls were not ready (${missingToolCallIds.join(', ')})` ) @@ -292,14 +380,25 @@ export async function orchestrateCopilotStream( if (readyTools.length === 0) { if (resumeRetries < 3 && continuation.pendingToolCallIds.length > 0) { resumeRetries++ - logger.info('Retrying async resume because no tool calls were ready yet', { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - retry: resumeRetries, - }) + logger.info( + withLogContext('Retrying async resume because no tool calls were ready yet'), + { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + retry: resumeRetries, + } + ) await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } + logger.error( + withLogContext('Async continuation failed because no tool calls were ready'), + { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + requestedToolCallIds: continuation.pendingToolCallIds, + } + ) throw new Error('Failed to resume async tool continuation: no tool calls were ready') } @@ -320,12 +419,15 @@ export async function orchestrateCopilotStream( if (claimFailures.length > 0) { if (newlyClaimedToolCallIds.length > 0) { - logger.info('Releasing async tool claims after claim contention during resume', { - checkpointId: continuation.checkpointId, - runId: continuation.runId, - newlyClaimedToolCallIds, - claimFailures, - }) + logger.info( + withLogContext('Releasing async tool claims after claim contention during resume'), + { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + newlyClaimedToolCallIds, + claimFailures, + } + ) await Promise.all( newlyClaimedToolCallIds.map((toolCallId) => releaseCompletedAsyncToolClaim(toolCallId, resumeWorkerId).catch(() => null) @@ -334,7 +436,7 @@ export async function orchestrateCopilotStream( } if (resumeRetries < 3) { resumeRetries++ - logger.info('Retrying async resume after claim contention', { + logger.info(withLogContext('Retrying async resume after claim contention'), { checkpointId: continuation.checkpointId, runId: continuation.runId, retry: resumeRetries, @@ -343,6 +445,14 @@ export async function orchestrateCopilotStream( await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries)) continue } + logger.error( + withLogContext('Async continuation failed because tool claims could not be acquired'), + { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + claimFailures, + } + ) throw new Error( `Failed to resume async tool continuation: unable to claim tool calls (${claimFailures.join(', ')})` ) @@ -356,7 +466,7 @@ export async function orchestrateCopilotStream( ] claimedByWorkerId = claimedToolCallIds.length > 0 ? resumeWorkerId : null - logger.info('Resuming async tool continuation', { + logger.info(withLogContext('Resuming async tool continuation'), { checkpointId: continuation.checkpointId, runId: continuation.runId, toolCallIds: readyTools.map((tool) => tool.toolCallId), @@ -395,10 +505,15 @@ export async function orchestrateCopilotStream( !isTerminalAsyncStatus(durableStatus) && !isDeliveredAsyncStatus(durableStatus) ) { - logger.warn('Async tool row was claimed for resume without terminal durable state', { - toolCallId: tool.toolCallId, - status: durableStatus, - }) + logger.warn( + withLogContext( + 'Async tool row was claimed for resume without terminal durable state' + ), + { + toolCallId: tool.toolCallId, + status: durableStatus, + } + ) } return { @@ -416,11 +531,20 @@ export async function orchestrateCopilotStream( checkpointId: continuation.checkpointId, results, } + logger.info(withLogContext('Prepared async continuation payload for resume endpoint'), { + route, + checkpointId: continuation.checkpointId, + resultCount: results.length, + }) resumeReady = true break } if (!resumeReady) { + logger.warn(withLogContext('Async continuation loop exited without resume payload'), { + checkpointId: continuation.checkpointId, + runId: continuation.runId, + }) break } } @@ -436,12 +560,19 @@ export async function orchestrateCopilotStream( usage: context.usage, cost: context.cost, } + logger.info(withLogContext('Completing copilot orchestration'), { + success: result.success, + chatId: result.chatId, + hasRequestId: Boolean(result.requestId), + errorCount: result.errors?.length || 0, + toolCallCount: result.toolCalls.length, + }) await options.onComplete?.(result) return result } catch (error) { const err = error instanceof Error ? error : new Error('Copilot orchestration failed') if (claimedToolCallIds.length > 0 && claimedByWorkerId) { - logger.warn('Releasing async tool claims after delivery failure', { + logger.warn(withLogContext('Releasing async tool claims after delivery failure'), { toolCallIds: claimedToolCallIds, workerId: claimedByWorkerId, }) @@ -451,7 +582,9 @@ export async function orchestrateCopilotStream( ) ) } - logger.error('Copilot orchestration failed', { error: err.message }) + logger.error(withLogContext('Copilot orchestration failed'), { + error: err.message, + }) await options.onError?.(err) return { success: false, diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts index 0b4001244d6..98c4952ba33 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts @@ -148,7 +148,8 @@ describe('sse-handlers tool lifecycle', () => { 'read', 499, 'Request aborted during tool execution', - { cancelled: true } + { cancelled: true }, + 'msg-1' ) const updated = context.toolCalls.get('tool-cancel') diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts index 17b820555c2..a0385e5a42b 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts @@ -1,6 +1,7 @@ import { createLogger } from '@sim/logger' import { upsertAsyncToolCall } from '@/lib/copilot/async-runs/repository' import { STREAM_TIMEOUT_MS } from '@/lib/copilot/constants' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { asRecord, getEventData, @@ -53,13 +54,25 @@ function abortPendingToolIfStreamDead( toolCall.status = 'cancelled' toolCall.endTime = Date.now() markToolResultSeen(toolCallId) - markToolComplete(toolCall.id, toolCall.name, 499, 'Request aborted before tool execution', { - cancelled: true, - }).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (stream aborted)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) + markToolComplete( + toolCall.id, + toolCall.name, + 499, + 'Request aborted before tool execution', + { + cancelled: true, + }, + context.messageId + ).catch((err) => { + logger.error( + appendCopilotLogContext('markToolComplete fire-and-forget failed (stream aborted)', { + messageId: context.messageId, + }), + { + toolCallId: toolCall.id, + error: err instanceof Error ? err.message : String(err), + } + ) }) return true } @@ -90,7 +103,8 @@ function getEventUI(event: SSEEvent): { function handleClientCompletion( toolCall: ToolCallState, toolCallId: string, - completion: { status: string; message?: string; data?: Record } | null + completion: { status: string; message?: string; data?: Record } | null, + context: StreamingContext ): void { if (completion?.status === 'background') { toolCall.status = 'skipped' @@ -100,12 +114,18 @@ function handleClientCompletion( toolCall.name, 202, completion.message || 'Tool execution moved to background', - { background: true } + { background: true }, + context.messageId ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (client background)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) + logger.error( + appendCopilotLogContext('markToolComplete fire-and-forget failed (client background)', { + messageId: context.messageId, + }), + { + toolCallId: toolCall.id, + error: err instanceof Error ? err.message : String(err), + } + ) }) markToolResultSeen(toolCallId) return @@ -117,12 +137,19 @@ function handleClientCompletion( toolCall.id, toolCall.name, 400, - completion.message || 'Tool execution rejected' + completion.message || 'Tool execution rejected', + undefined, + context.messageId ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (client rejected)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) + logger.error( + appendCopilotLogContext('markToolComplete fire-and-forget failed (client rejected)', { + messageId: context.messageId, + }), + { + toolCallId: toolCall.id, + error: err instanceof Error ? err.message : String(err), + } + ) }) markToolResultSeen(toolCallId) return @@ -135,12 +162,18 @@ function handleClientCompletion( toolCall.name, 499, completion.message || 'Workflow execution was stopped manually by the user.', - completion.data + completion.data, + context.messageId ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed (client cancelled)', { - toolCallId: toolCall.id, - error: err instanceof Error ? err.message : String(err), - }) + logger.error( + appendCopilotLogContext('markToolComplete fire-and-forget failed (client cancelled)', { + messageId: context.messageId, + }), + { + toolCallId: toolCall.id, + error: err instanceof Error ? err.message : String(err), + } + ) }) markToolResultSeen(toolCallId) return @@ -149,15 +182,25 @@ function handleClientCompletion( toolCall.status = success ? 'success' : 'error' toolCall.endTime = Date.now() const msg = completion?.message || (success ? 'Tool completed' : 'Tool failed or timed out') - markToolComplete(toolCall.id, toolCall.name, success ? 200 : 500, msg, completion?.data).catch( - (err) => { - logger.error('markToolComplete fire-and-forget failed (client completion)', { + markToolComplete( + toolCall.id, + toolCall.name, + success ? 200 : 500, + msg, + completion?.data, + context.messageId + ).catch((err) => { + logger.error( + appendCopilotLogContext('markToolComplete fire-and-forget failed (client completion)', { + messageId: context.messageId, + }), + { toolCallId: toolCall.id, toolName: toolCall.name, error: err instanceof Error ? err.message : String(err), - }) - } - ) + } + ) + }) markToolResultSeen(toolCallId) } @@ -170,7 +213,8 @@ async function emitSyntheticToolResult( toolCallId: string, toolName: string, completion: { status: string; message?: string; data?: Record } | null, - options: OrchestratorOptions + options: OrchestratorOptions, + context: StreamingContext ): Promise { const success = completion?.status === 'success' const isCancelled = completion?.status === 'cancelled' @@ -189,11 +233,16 @@ async function emitSyntheticToolResult( error: !success ? completion?.message : undefined, } as SSEEvent) } catch (error) { - logger.warn('Failed to emit synthetic tool_result', { - toolCallId, - toolName, - error: error instanceof Error ? error.message : String(error), - }) + logger.warn( + appendCopilotLogContext('Failed to emit synthetic tool_result', { + messageId: context.messageId, + }), + { + toolCallId, + toolName, + error: error instanceof Error ? error.message : String(error), + } + ) } } @@ -260,6 +309,17 @@ export const sseHandlers: Record = { const rid = typeof event.data === 'string' ? event.data : undefined if (rid) { context.requestId = rid + logger.info( + appendCopilotLogContext('Mapped copilot message to Go trace ID', { + messageId: context.messageId, + }), + { + goTraceId: rid, + chatId: context.chatId, + executionId: context.executionId, + runId: context.runId, + } + ) } }, title_updated: () => {}, @@ -406,19 +466,29 @@ export const sseHandlers: Record = { args, }) } catch (err) { - logger.warn('Failed to persist async tool row before execution', { - toolCallId, - toolName, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to persist async tool row before execution', { + messageId: context.messageId, + }), + { + toolCallId, + toolName, + error: err instanceof Error ? err.message : String(err), + } + ) } return executeToolAndReport(toolCallId, context, execContext, options) })().catch((err) => { - logger.error('Parallel tool execution failed', { - toolCallId, - toolName, - error: err instanceof Error ? err.message : String(err), - }) + logger.error( + appendCopilotLogContext('Parallel tool execution failed', { + messageId: context.messageId, + }), + { + toolCallId, + toolName, + error: err instanceof Error ? err.message : String(err), + } + ) return { status: 'error', message: err instanceof Error ? err.message : String(err), @@ -457,19 +527,24 @@ export const sseHandlers: Record = { args, status: 'running', }).catch((err) => { - logger.warn('Failed to persist async tool row for client-executable tool', { - toolCallId, - toolName, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to persist async tool row for client-executable tool', { + messageId: context.messageId, + }), + { + toolCallId, + toolName, + error: err instanceof Error ? err.message : String(err), + } + ) }) const completion = await waitForToolCompletion( toolCallId, options.timeout || STREAM_TIMEOUT_MS, options.abortSignal ) - handleClientCompletion(toolCall, toolCallId, completion) - await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options) + handleClientCompletion(toolCall, toolCallId, completion, context) + await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options, context) } return } @@ -651,19 +726,29 @@ export const subAgentHandlers: Record = { args, }) } catch (err) { - logger.warn('Failed to persist async subagent tool row before execution', { - toolCallId, - toolName, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to persist async subagent tool row before execution', { + messageId: context.messageId, + }), + { + toolCallId, + toolName, + error: err instanceof Error ? err.message : String(err), + } + ) } return executeToolAndReport(toolCallId, context, execContext, options) })().catch((err) => { - logger.error('Parallel subagent tool execution failed', { - toolCallId, - toolName, - error: err instanceof Error ? err.message : String(err), - }) + logger.error( + appendCopilotLogContext('Parallel subagent tool execution failed', { + messageId: context.messageId, + }), + { + toolCallId, + toolName, + error: err instanceof Error ? err.message : String(err), + } + ) return { status: 'error', message: err instanceof Error ? err.message : String(err), @@ -697,19 +782,25 @@ export const subAgentHandlers: Record = { args, status: 'running', }).catch((err) => { - logger.warn('Failed to persist async tool row for client-executable subagent tool', { - toolCallId, - toolName, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext( + 'Failed to persist async tool row for client-executable subagent tool', + { messageId: context.messageId } + ), + { + toolCallId, + toolName, + error: err instanceof Error ? err.message : String(err), + } + ) }) const completion = await waitForToolCompletion( toolCallId, options.timeout || STREAM_TIMEOUT_MS, options.abortSignal ) - handleClientCompletion(toolCall, toolCallId, completion) - await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options) + handleClientCompletion(toolCall, toolCallId, completion, context) + await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options, context) } return } @@ -769,10 +860,15 @@ export const subAgentHandlers: Record = { export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean { if (!event.subagent) return false if (!context.subAgentParentToolCallId) { - logger.warn('Subagent event missing parent tool call', { - type: event.type, - subagent: event.subagent, - }) + logger.warn( + appendCopilotLogContext('Subagent event missing parent tool call', { + messageId: context.messageId, + }), + { + type: event.type, + subagent: event.subagent, + } + ) return false } return true diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index 2b48b5a91c9..e8442e1c76c 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -3,6 +3,7 @@ import { userTableRows } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq } from 'drizzle-orm' import { completeAsyncToolCall, markAsyncToolRunning } from '@/lib/copilot/async-runs/repository' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { waitForToolConfirmation } from '@/lib/copilot/orchestrator/persistence' import { asRecord, markToolResultSeen } from '@/lib/copilot/orchestrator/sse/utils' import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor' @@ -186,12 +187,15 @@ async function maybeWriteOutputToFile( contentType ) - logger.info('Tool output written to file', { - toolName, - fileName, - size: buffer.length, - fileId: uploaded.id, - }) + logger.info( + appendCopilotLogContext('Tool output written to file', { messageId: context.messageId }), + { + toolName, + fileName, + size: buffer.length, + fileId: uploaded.id, + } + ) return { success: true, @@ -205,11 +209,16 @@ async function maybeWriteOutputToFile( } } catch (err) { const message = err instanceof Error ? err.message : String(err) - logger.warn('Failed to write tool output to file', { - toolName, - outputPath, - error: message, - }) + logger.warn( + appendCopilotLogContext('Failed to write tool output to file', { + messageId: context.messageId, + }), + { + toolName, + outputPath, + error: message, + } + ) return { success: false, error: `Failed to write output file: ${message}`, @@ -289,10 +298,11 @@ function terminalCompletionFromToolCall(toolCall: { function reportCancelledTool( toolCall: { id: string; name: string }, message: string, + messageId?: string, data: Record = { cancelled: true } ): void { - markToolComplete(toolCall.id, toolCall.name, 499, message, data).catch((err) => { - logger.error('markToolComplete failed (cancelled)', { + markToolComplete(toolCall.id, toolCall.name, 499, message, data, messageId).catch((err) => { + logger.error(appendCopilotLogContext('markToolComplete failed (cancelled)', { messageId }), { toolCallId: toolCall.id, toolName: toolCall.name, error: err instanceof Error ? err.message : String(err), @@ -387,11 +397,14 @@ async function maybeWriteOutputToTable( } }) - logger.info('Tool output written to table', { - toolName, - tableId: outputTable, - rowCount: rows.length, - }) + logger.info( + appendCopilotLogContext('Tool output written to table', { messageId: context.messageId }), + { + toolName, + tableId: outputTable, + rowCount: rows.length, + } + ) return { success: true, @@ -402,11 +415,16 @@ async function maybeWriteOutputToTable( }, } } catch (err) { - logger.warn('Failed to write tool output to table', { - toolName, - outputTable, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to write tool output to table', { + messageId: context.messageId, + }), + { + toolName, + outputTable, + error: err instanceof Error ? err.message : String(err), + } + ) return { success: false, error: `Failed to write to table: ${err instanceof Error ? err.message : String(err)}`, @@ -506,13 +524,16 @@ async function maybeWriteReadCsvToTable( } }) - logger.info('Read output written to table', { - toolName, - tableId: outputTable, - tableName: table.name, - rowCount: rows.length, - filePath, - }) + logger.info( + appendCopilotLogContext('Read output written to table', { messageId: context.messageId }), + { + toolName, + tableId: outputTable, + tableName: table.name, + rowCount: rows.length, + filePath, + } + ) return { success: true, @@ -524,11 +545,16 @@ async function maybeWriteReadCsvToTable( }, } } catch (err) { - logger.warn('Failed to write read output to table', { - toolName, - outputTable, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to write read output to table', { + messageId: context.messageId, + }), + { + toolName, + outputTable, + error: err instanceof Error ? err.message : String(err), + } + ) return { success: false, error: `Failed to import into table: ${err instanceof Error ? err.message : String(err)}`, @@ -562,14 +588,14 @@ export async function executeToolAndReport( result: { cancelled: true }, error: 'Request aborted before tool execution', }).catch(() => {}) - reportCancelledTool(toolCall, 'Request aborted before tool execution') + reportCancelledTool(toolCall, 'Request aborted before tool execution', context.messageId) return cancelledCompletion('Request aborted before tool execution') } toolCall.status = 'executing' await markAsyncToolRunning(toolCall.id, 'sim-stream').catch(() => {}) - logger.info('Tool execution started', { + logger.info(appendCopilotLogContext('Tool execution started', { messageId: context.messageId }), { toolCallId: toolCall.id, toolName: toolCall.name, params: toolCall.params, @@ -590,7 +616,7 @@ export async function executeToolAndReport( result: { cancelled: true }, error: 'Request aborted during tool execution', }).catch(() => {}) - reportCancelledTool(toolCall, 'Request aborted during tool execution') + reportCancelledTool(toolCall, 'Request aborted during tool execution', context.messageId) return cancelledCompletion('Request aborted during tool execution') } result = await maybeWriteOutputToFile(toolCall.name, toolCall.params, result, execContext) @@ -604,7 +630,11 @@ export async function executeToolAndReport( result: { cancelled: true }, error: 'Request aborted during tool post-processing', }).catch(() => {}) - reportCancelledTool(toolCall, 'Request aborted during tool post-processing') + reportCancelledTool( + toolCall, + 'Request aborted during tool post-processing', + context.messageId + ) return cancelledCompletion('Request aborted during tool post-processing') } result = await maybeWriteOutputToTable(toolCall.name, toolCall.params, result, execContext) @@ -618,7 +648,11 @@ export async function executeToolAndReport( result: { cancelled: true }, error: 'Request aborted during tool post-processing', }).catch(() => {}) - reportCancelledTool(toolCall, 'Request aborted during tool post-processing') + reportCancelledTool( + toolCall, + 'Request aborted during tool post-processing', + context.messageId + ) return cancelledCompletion('Request aborted during tool post-processing') } result = await maybeWriteReadCsvToTable(toolCall.name, toolCall.params, result, execContext) @@ -632,7 +666,11 @@ export async function executeToolAndReport( result: { cancelled: true }, error: 'Request aborted during tool post-processing', }).catch(() => {}) - reportCancelledTool(toolCall, 'Request aborted during tool post-processing') + reportCancelledTool( + toolCall, + 'Request aborted during tool post-processing', + context.messageId + ) return cancelledCompletion('Request aborted during tool post-processing') } toolCall.status = result.success ? 'success' : 'error' @@ -648,18 +686,24 @@ export async function executeToolAndReport( : raw && typeof raw === 'object' ? JSON.stringify(raw).slice(0, 200) : undefined - logger.info('Tool execution succeeded', { - toolCallId: toolCall.id, - toolName: toolCall.name, - outputPreview: preview, - }) + logger.info( + appendCopilotLogContext('Tool execution succeeded', { messageId: context.messageId }), + { + toolCallId: toolCall.id, + toolName: toolCall.name, + outputPreview: preview, + } + ) } else { - logger.warn('Tool execution failed', { - toolCallId: toolCall.id, - toolName: toolCall.name, - error: result.error, - params: toolCall.params, - }) + logger.warn( + appendCopilotLogContext('Tool execution failed', { messageId: context.messageId }), + { + toolCallId: toolCall.id, + toolName: toolCall.name, + error: result.error, + params: toolCall.params, + } + ) } // If create_workflow was successful, update the execution context with the new workflowId. @@ -687,7 +731,11 @@ export async function executeToolAndReport( if (abortRequested(context, execContext, options)) { toolCall.status = 'cancelled' - reportCancelledTool(toolCall, 'Request aborted before tool result delivery') + reportCancelledTool( + toolCall, + 'Request aborted before tool result delivery', + context.messageId + ) return cancelledCompletion('Request aborted before tool result delivery') } @@ -702,13 +750,19 @@ export async function executeToolAndReport( toolCall.name, result.success ? 200 : 500, result.error || (result.success ? 'Tool completed' : 'Tool failed'), - result.output + result.output, + context.messageId ).catch((err) => { - logger.error('markToolComplete fire-and-forget failed', { - toolCallId: toolCall.id, - toolName: toolCall.name, - error: err instanceof Error ? err.message : String(err), - }) + logger.error( + appendCopilotLogContext('markToolComplete fire-and-forget failed', { + messageId: context.messageId, + }), + { + toolCallId: toolCall.id, + toolName: toolCall.name, + error: err instanceof Error ? err.message : String(err), + } + ) }) const resultEvent: SSEEvent = { @@ -743,10 +797,15 @@ export async function executeToolAndReport( if (deleted.length > 0) { isDeleteOp = true removeChatResources(execContext.chatId, deleted).catch((err) => { - logger.warn('Failed to remove chat resources after deletion', { - chatId: execContext.chatId, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to remove chat resources after deletion', { + messageId: context.messageId, + }), + { + chatId: execContext.chatId, + error: err instanceof Error ? err.message : String(err), + } + ) }) for (const resource of deleted) { @@ -769,10 +828,15 @@ export async function executeToolAndReport( if (resources.length > 0) { persistChatResources(execContext.chatId, resources).catch((err) => { - logger.warn('Failed to persist chat resources', { - chatId: execContext.chatId, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to persist chat resources', { + messageId: context.messageId, + }), + { + chatId: execContext.chatId, + error: err instanceof Error ? err.message : String(err), + } + ) }) for (const resource of resources) { @@ -801,19 +865,22 @@ export async function executeToolAndReport( result: { cancelled: true }, error: 'Request aborted during tool execution', }).catch(() => {}) - reportCancelledTool(toolCall, 'Request aborted during tool execution') + reportCancelledTool(toolCall, 'Request aborted during tool execution', context.messageId) return cancelledCompletion('Request aborted during tool execution') } toolCall.status = 'error' toolCall.error = error instanceof Error ? error.message : String(error) toolCall.endTime = Date.now() - logger.error('Tool execution threw', { - toolCallId: toolCall.id, - toolName: toolCall.name, - error: toolCall.error, - params: toolCall.params, - }) + logger.error( + appendCopilotLogContext('Tool execution threw', { messageId: context.messageId }), + { + toolCallId: toolCall.id, + toolName: toolCall.name, + error: toolCall.error, + params: toolCall.params, + } + ) markToolResultSeen(toolCall.id) await completeAsyncToolCall({ @@ -825,14 +892,26 @@ export async function executeToolAndReport( // Fire-and-forget (same reasoning as above). // Pass error as structured data so the Go side can surface it to the LLM. - markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error, { - error: toolCall.error, - }).catch((err) => { - logger.error('markToolComplete fire-and-forget failed', { - toolCallId: toolCall.id, - toolName: toolCall.name, - error: err instanceof Error ? err.message : String(err), - }) + markToolComplete( + toolCall.id, + toolCall.name, + 500, + toolCall.error, + { + error: toolCall.error, + }, + context.messageId + ).catch((err) => { + logger.error( + appendCopilotLogContext('markToolComplete fire-and-forget failed', { + messageId: context.messageId, + }), + { + toolCallId: toolCall.id, + toolName: toolCall.name, + error: err instanceof Error ? err.message : String(err), + } + ) }) const errorEvent: SSEEvent = { diff --git a/apps/sim/lib/copilot/orchestrator/stream/core.ts b/apps/sim/lib/copilot/orchestrator/stream/core.ts index 9367a3b181a..79f5facb47d 100644 --- a/apps/sim/lib/copilot/orchestrator/stream/core.ts +++ b/apps/sim/lib/copilot/orchestrator/stream/core.ts @@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger' import { getHighestPrioritySubscription } from '@/lib/billing/core/plan' import { isPaid } from '@/lib/billing/plan-helpers' import { ORCHESTRATION_TIMEOUT_MS } from '@/lib/copilot/constants' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { handleSubagentRouting, sseHandlers, @@ -164,10 +165,13 @@ export async function runStreamLoop( try { await options.onEvent?.(normalizedEvent) } catch (error) { - logger.warn('Failed to forward SSE event', { - type: normalizedEvent.type, - error: error instanceof Error ? error.message : String(error), - }) + logger.warn( + appendCopilotLogContext('Failed to forward SSE event', { messageId: context.messageId }), + { + type: normalizedEvent.type, + error: error instanceof Error ? error.message : String(error), + } + ) } // Let the caller intercept before standard dispatch. @@ -201,7 +205,11 @@ export async function runStreamLoop( if (context.subAgentParentStack.length > 0) { context.subAgentParentStack.pop() } else { - logger.warn('subagent_end without matching subagent_start') + logger.warn( + appendCopilotLogContext('subagent_end without matching subagent_start', { + messageId: context.messageId, + }) + ) } context.subAgentParentToolCallId = context.subAgentParentStack.length > 0 diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts index 68880b77e00..85778ed36cf 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts @@ -3,6 +3,7 @@ import { credential, mcpServers, pendingCredentialDraft, user } from '@sim/db/sc import { createLogger } from '@sim/logger' import { and, eq, isNull, lt } from 'drizzle-orm' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import type { ExecutionContext, ToolCallResult, @@ -321,12 +322,17 @@ async function executeManageCustomTool( error: `Unsupported operation for manage_custom_tool: ${operation}`, } } catch (error) { - logger.error('manage_custom_tool execution failed', { - operation, - workspaceId, - userId: context.userId, - error: error instanceof Error ? error.message : String(error), - }) + logger.error( + appendCopilotLogContext('manage_custom_tool execution failed', { + messageId: context.messageId, + }), + { + operation, + workspaceId, + userId: context.userId, + error: error instanceof Error ? error.message : String(error), + } + ) return { success: false, error: error instanceof Error ? error.message : 'Failed to manage custom tool', @@ -553,11 +559,16 @@ async function executeManageMcpTool( return { success: false, error: `Unsupported operation for manage_mcp_tool: ${operation}` } } catch (error) { - logger.error('manage_mcp_tool execution failed', { - operation, - workspaceId, - error: error instanceof Error ? error.message : String(error), - }) + logger.error( + appendCopilotLogContext('manage_mcp_tool execution failed', { + messageId: context.messageId, + }), + { + operation, + workspaceId, + error: error instanceof Error ? error.message : String(error), + } + ) return { success: false, error: error instanceof Error ? error.message : 'Failed to manage MCP server', @@ -716,11 +727,16 @@ async function executeManageSkill( return { success: false, error: `Unsupported operation for manage_skill: ${operation}` } } catch (error) { - logger.error('manage_skill execution failed', { - operation, - workspaceId, - error: error instanceof Error ? error.message : String(error), - }) + logger.error( + appendCopilotLogContext('manage_skill execution failed', { + messageId: context.messageId, + }), + { + operation, + workspaceId, + error: error instanceof Error ? error.message : String(error), + } + ) return { success: false, error: error instanceof Error ? error.message : 'Failed to manage skill', @@ -992,10 +1008,15 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record< }, } } catch (err) { - logger.warn('Failed to generate OAuth link, falling back to generic URL', { - providerName, - error: err instanceof Error ? err.message : String(err), - }) + logger.warn( + appendCopilotLogContext('Failed to generate OAuth link, falling back to generic URL', { + messageId: c.messageId, + }), + { + providerName, + error: err instanceof Error ? err.message : String(err), + } + ) const workspaceUrl = c.workspaceId ? `${baseUrl}/workspace/${c.workspaceId}` : `${baseUrl}/workspace` @@ -1179,7 +1200,12 @@ export async function executeToolServerSide( const toolConfig = getTool(resolvedToolName) if (!toolConfig) { - logger.warn('Tool not found in registry', { toolName, resolvedToolName }) + logger.warn( + appendCopilotLogContext('Tool not found in registry', { + messageId: context.messageId, + }), + { toolName, resolvedToolName } + ) return { success: false, error: `Tool not found: ${toolName}`, @@ -1241,6 +1267,7 @@ async function executeServerToolDirect( workspaceId: context.workspaceId, userPermission: context.userPermission, chatId: context.chatId, + messageId: context.messageId, abortSignal: context.abortSignal, }) @@ -1266,10 +1293,15 @@ async function executeServerToolDirect( return { success: true, output: result } } catch (error) { - logger.error('Server tool execution failed', { - toolName, - error: error instanceof Error ? error.message : String(error), - }) + logger.error( + appendCopilotLogContext('Server tool execution failed', { + messageId: context.messageId, + }), + { + toolName, + error: error instanceof Error ? error.message : String(error), + } + ) return { success: false, error: error instanceof Error ? error.message : 'Server tool execution failed', @@ -1320,7 +1352,8 @@ export async function markToolComplete( toolName: string, status: number, message?: unknown, - data?: unknown + data?: unknown, + messageId?: string ): Promise { try { const controller = new AbortController() @@ -1344,7 +1377,11 @@ export async function markToolComplete( }) if (!response.ok) { - logger.warn('Mark-complete call failed', { toolCallId, toolName, status: response.status }) + logger.warn(appendCopilotLogContext('Mark-complete call failed', { messageId }), { + toolCallId, + toolName, + status: response.status, + }) return false } @@ -1354,7 +1391,7 @@ export async function markToolComplete( } } catch (error) { const isTimeout = error instanceof DOMException && error.name === 'AbortError' - logger.error('Mark-complete call failed', { + logger.error(appendCopilotLogContext('Mark-complete call failed', { messageId }), { toolCallId, toolName, timedOut: isTimeout, diff --git a/apps/sim/lib/copilot/orchestrator/types.ts b/apps/sim/lib/copilot/orchestrator/types.ts index 97c3bd61367..791f6632373 100644 --- a/apps/sim/lib/copilot/orchestrator/types.ts +++ b/apps/sim/lib/copilot/orchestrator/types.ts @@ -195,6 +195,7 @@ export interface ExecutionContext { workflowId: string workspaceId?: string chatId?: string + messageId?: string executionId?: string runId?: string abortSignal?: AbortSignal diff --git a/apps/sim/lib/copilot/tools/server/base-tool.ts b/apps/sim/lib/copilot/tools/server/base-tool.ts index db7a2d6cc25..1eca5a063e6 100644 --- a/apps/sim/lib/copilot/tools/server/base-tool.ts +++ b/apps/sim/lib/copilot/tools/server/base-tool.ts @@ -5,6 +5,7 @@ export interface ServerToolContext { workspaceId?: string userPermission?: string chatId?: string + messageId?: string abortSignal?: AbortSignal } diff --git a/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts b/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts index 8aaf0edbc9a..fbae8b4dcc0 100644 --- a/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts +++ b/apps/sim/lib/copilot/tools/server/files/download-to-workspace-file.ts @@ -1,5 +1,6 @@ import { createLogger } from '@sim/logger' import { z } from 'zod' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -124,6 +125,9 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool< params: DownloadToWorkspaceFileArgs, context?: ServerToolContext ): Promise { + const withMessageId = (message: string) => + appendCopilotLogContext(message, { messageId: context?.messageId }) + if (!context?.userId) { throw new Error('Authentication required') } @@ -174,7 +178,7 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool< mimeType ) - logger.info('Downloaded remote file to workspace', { + logger.info(withMessageId('Downloaded remote file to workspace'), { sourceUrl: params.url, fileId: uploaded.id, fileName: uploaded.name, @@ -191,7 +195,10 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool< } } catch (error) { const msg = error instanceof Error ? error.message : 'Unknown error' - logger.error('Failed to download file to workspace', { url: params.url, error: msg }) + logger.error(withMessageId('Failed to download file to workspace'), { + url: params.url, + error: msg, + }) return { success: false, message: `Failed to download file: ${msg}` } } }, diff --git a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts index e85fec40758..538ea4f52fa 100644 --- a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts +++ b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -50,8 +51,11 @@ export const workspaceFileServerTool: BaseServerTool { + const withMessageId = (message: string) => + appendCopilotLogContext(message, { messageId: context?.messageId }) + if (!context?.userId) { - logger.error('Unauthorized attempt to access workspace files') + logger.error(withMessageId('Unauthorized attempt to access workspace files')) throw new Error('Authentication required') } @@ -90,7 +94,7 @@ export const workspaceFileServerTool: BaseServerTool { + const withMessageId = (message: string) => + appendCopilotLogContext(message, { messageId: context?.messageId }) + if (!context?.userId) { throw new Error('Authentication required') } @@ -93,17 +97,17 @@ export const generateImageServerTool: BaseServerTool = { name: 'get_job_logs', async execute(rawArgs: GetJobLogsArgs, context?: ServerToolContext): Promise { + const withMessageId = (message: string) => + appendCopilotLogContext(message, { messageId: context?.messageId }) + const { jobId, executionId, @@ -110,7 +114,12 @@ export const getJobLogsServerTool: BaseServerTool const clampedLimit = Math.min(Math.max(1, limit), 5) - logger.info('Fetching job logs', { jobId, executionId, limit: clampedLimit, includeDetails }) + logger.info(withMessageId('Fetching job logs'), { + jobId, + executionId, + limit: clampedLimit, + includeDetails, + }) const conditions = [eq(jobExecutionLogs.scheduleId, jobId)] if (executionId) { @@ -164,7 +173,7 @@ export const getJobLogsServerTool: BaseServerTool return entry }) - logger.info('Job logs prepared', { + logger.info(withMessageId('Job logs prepared'), { jobId, count: entries.length, resultSizeKB: Math.round(JSON.stringify(entries).length / 1024), diff --git a/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts b/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts index a459a271957..2eae12f37ef 100644 --- a/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts +++ b/apps/sim/lib/copilot/tools/server/knowledge/knowledge-base.ts @@ -3,6 +3,7 @@ import { knowledgeConnector } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, isNull } from 'drizzle-orm' import { generateInternalToken } from '@/lib/auth/internal' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -47,8 +48,15 @@ export const knowledgeBaseServerTool: BaseServerTool { + const withMessageId = (message: string) => + appendCopilotLogContext(message, { messageId: context?.messageId }) + if (!context?.userId) { - logger.error('Unauthorized attempt to access knowledge base - no authenticated user context') + logger.error( + withMessageId( + 'Unauthorized attempt to access knowledge base - no authenticated user context' + ) + ) throw new Error('Authentication required') } @@ -97,7 +105,7 @@ export const knowledgeBaseServerTool: BaseServerTool { - logger.error('Background document processing failed', { + logger.error(withMessageId('Background document processing failed'), { documentId: doc.id, error: err instanceof Error ? err.message : String(err), }) }) - logger.info('Workspace file added to knowledge base via copilot', { + logger.info(withMessageId('Workspace file added to knowledge base via copilot'), { knowledgeBaseId: args.knowledgeBaseId, documentId: doc.id, fileName: fileRecord.name, @@ -344,7 +352,7 @@ export const knowledgeBaseServerTool: BaseServerTool = { name: 'user_table', async execute(params: UserTableArgs, context?: ServerToolContext): Promise { + const withMessageId = (message: string) => + appendCopilotLogContext(message, { messageId: context?.messageId }) + if (!context?.userId) { - logger.error('Unauthorized attempt to access user table - no authenticated user context') + logger.error( + withMessageId('Unauthorized attempt to access user table - no authenticated user context') + ) throw new Error('Authentication required') } @@ -723,7 +729,7 @@ export const userTableServerTool: BaseServerTool const coerced = coerceRows(rows, columns, columnMap) const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - logger.info('Table created from file', { + logger.info(withMessageId('Table created from file'), { tableId: table.id, fileName: file.name, columns: columns.length, @@ -799,7 +805,7 @@ export const userTableServerTool: BaseServerTool const coerced = coerceRows(rows, matchedColumns, columnMap) const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context) - logger.info('Rows imported from file', { + logger.info(withMessageId('Rows imported from file'), { tableId: table.id, fileName: file.name, matchedColumns: mappedHeaders.length, @@ -997,7 +1003,11 @@ export const userTableServerTool: BaseServerTool ? error.cause.message : String(error.cause) : undefined - logger.error('Table operation failed', { operation, error: errorMessage, cause }) + logger.error(withMessageId('Table operation failed'), { + operation, + error: errorMessage, + cause, + }) const displayMessage = cause ? `${errorMessage} (${cause})` : errorMessage return { success: false, message: `Operation failed: ${displayMessage}` } } diff --git a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts index b1eaf61a16d..abceb9d9d50 100644 --- a/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts +++ b/apps/sim/lib/copilot/tools/server/visualization/generate-visualization.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { appendCopilotLogContext } from '@/lib/copilot/logging' import { assertServerToolNotAborted, type BaseServerTool, @@ -62,8 +63,10 @@ function validateGeneratedWorkspaceFileName(fileName: string): string | null { async function collectSandboxFiles( workspaceId: string, inputFiles?: string[], - inputTables?: string[] + inputTables?: string[], + messageId?: string ): Promise { + const withMessageId = (message: string) => appendCopilotLogContext(message, { messageId }) const sandboxFiles: SandboxFile[] = [] let totalSize = 0 @@ -72,12 +75,12 @@ async function collectSandboxFiles( for (const fileRef of inputFiles) { const record = findWorkspaceFileRecord(allFiles, fileRef) if (!record) { - logger.warn('Sandbox input file not found', { fileRef }) + logger.warn(withMessageId('Sandbox input file not found'), { fileRef }) continue } const ext = record.name.split('.').pop()?.toLowerCase() ?? '' if (!TEXT_EXTENSIONS.has(ext)) { - logger.warn('Skipping non-text sandbox input file', { + logger.warn(withMessageId('Skipping non-text sandbox input file'), { fileId: record.id, fileName: record.name, ext, @@ -85,7 +88,7 @@ async function collectSandboxFiles( continue } if (record.size > MAX_FILE_SIZE) { - logger.warn('Sandbox input file exceeds size limit', { + logger.warn(withMessageId('Sandbox input file exceeds size limit'), { fileId: record.id, fileName: record.name, size: record.size, @@ -93,7 +96,9 @@ async function collectSandboxFiles( continue } if (totalSize + record.size > MAX_TOTAL_SIZE) { - logger.warn('Sandbox input total size limit reached, skipping remaining files') + logger.warn( + withMessageId('Sandbox input total size limit reached, skipping remaining files') + ) break } const buffer = await downloadWorkspaceFile(record) @@ -114,7 +119,7 @@ async function collectSandboxFiles( for (const tableId of inputTables) { const table = await getTableById(tableId) if (!table) { - logger.warn('Sandbox input table not found', { tableId }) + logger.warn(withMessageId('Sandbox input table not found'), { tableId }) continue } const { rows } = await queryRows(tableId, workspaceId, { limit: 10000 }, 'sandbox-input') @@ -129,7 +134,9 @@ async function collectSandboxFiles( } const csvContent = csvLines.join('\n') if (totalSize + csvContent.length > MAX_TOTAL_SIZE) { - logger.warn('Sandbox input total size limit reached, skipping remaining tables') + logger.warn( + withMessageId('Sandbox input total size limit reached, skipping remaining tables') + ) break } totalSize += csvContent.length @@ -150,6 +157,9 @@ export const generateVisualizationServerTool: BaseServerTool< params: VisualizationArgs, context?: ServerToolContext ): Promise { + const withMessageId = (message: string) => + appendCopilotLogContext(message, { messageId: context?.messageId }) + if (!context?.userId) { throw new Error('Authentication required') } @@ -167,7 +177,8 @@ export const generateVisualizationServerTool: BaseServerTool< const sandboxFiles = await collectSandboxFiles( workspaceId, params.inputFiles, - params.inputTables + params.inputTables, + context.messageId ) const wrappedCode = [ @@ -232,7 +243,7 @@ export const generateVisualizationServerTool: BaseServerTool< imageBuffer, 'image/png' ) - logger.info('Chart image overwritten', { + logger.info(withMessageId('Chart image overwritten'), { fileId: updated.id, fileName: updated.name, size: imageBuffer.length, @@ -256,7 +267,7 @@ export const generateVisualizationServerTool: BaseServerTool< 'image/png' ) - logger.info('Chart image saved', { + logger.info(withMessageId('Chart image saved'), { fileId: uploaded.id, fileName: uploaded.name, size: imageBuffer.length, @@ -271,7 +282,7 @@ export const generateVisualizationServerTool: BaseServerTool< } } catch (error) { const msg = error instanceof Error ? error.message : 'Unknown error' - logger.error('Visualization generation failed', { error: msg }) + logger.error(withMessageId('Visualization generation failed'), { error: msg }) return { success: false, message: `Failed to generate visualization: ${msg}` } } },