Skip to content
230 changes: 94 additions & 136 deletions apps/sim/lib/copilot/orchestrator/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,16 @@
* @vitest-environment node
*/

import { beforeEach, describe, expect, it, vi } from 'vitest'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import type { OrchestratorOptions } from './types'

const {
prepareExecutionContext,
getEffectiveDecryptedEnv,
runStreamLoop,
claimCompletedAsyncToolCall,
getAsyncToolCall,
getAsyncToolCalls,
markAsyncToolDelivered,
releaseCompletedAsyncToolClaim,
updateRunStatus,
} = vi.hoisted(() => ({
prepareExecutionContext: vi.fn(),
getEffectiveDecryptedEnv: vi.fn(),
runStreamLoop: vi.fn(),
claimCompletedAsyncToolCall: vi.fn(),
getAsyncToolCall: vi.fn(),
getAsyncToolCalls: vi.fn(),
markAsyncToolDelivered: vi.fn(),
releaseCompletedAsyncToolClaim: vi.fn(),
updateRunStatus: vi.fn(),
}))
const { prepareExecutionContext, getEffectiveDecryptedEnv, runStreamLoop, updateRunStatus } =
vi.hoisted(() => ({
prepareExecutionContext: vi.fn(),
getEffectiveDecryptedEnv: vi.fn(),
runStreamLoop: vi.fn(),
updateRunStatus: vi.fn(),
}))

vi.mock('@/lib/copilot/orchestrator/tool-executor', () => ({
prepareExecutionContext,
Expand All @@ -36,11 +22,6 @@ vi.mock('@/lib/environment/utils', () => ({
}))

vi.mock('@/lib/copilot/async-runs/repository', () => ({
claimCompletedAsyncToolCall,
getAsyncToolCall,
getAsyncToolCalls,
markAsyncToolDelivered,
releaseCompletedAsyncToolClaim,
updateRunStatus,
}))

Expand All @@ -56,37 +37,40 @@ vi.mock('@/lib/copilot/orchestrator/stream/core', async () => {
import { orchestrateCopilotStream } from './index'

describe('orchestrateCopilotStream async continuation', () => {
const fetchMock = vi.fn()

beforeEach(() => {
vi.clearAllMocks()
vi.useFakeTimers()
vi.stubGlobal('fetch', fetchMock)
prepareExecutionContext.mockResolvedValue({
userId: 'user-1',
workflowId: 'workflow-1',
chatId: 'chat-1',
})
getEffectiveDecryptedEnv.mockResolvedValue({})
claimCompletedAsyncToolCall.mockResolvedValue({ toolCallId: 'tool-1' })
getAsyncToolCall.mockResolvedValue({
toolCallId: 'tool-1',
toolName: 'read',
status: 'completed',
result: { ok: true },
error: null,
})
getAsyncToolCalls.mockResolvedValue([
{
toolCallId: 'tool-1',
toolName: 'read',
status: 'completed',
result: { ok: true },
error: null,
},
])
markAsyncToolDelivered.mockResolvedValue(null)
releaseCompletedAsyncToolClaim.mockResolvedValue(null)
updateRunStatus.mockResolvedValue(null)
})

it('builds resume payloads with success=true for claimed completed rows', async () => {
afterEach(() => {
vi.useRealTimers()
vi.unstubAllGlobals()
})

it('resumes with checkpointId only after Go reports readiness', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
json: async () => ({
success: true,
checkpointId: 'checkpoint-1',
runId: 'run-1',
resumeState: 'ready',
ready: true,
pendingCallIds: ['tool-1'],
missingCallIds: [],
}),
})

runStreamLoop
.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => {
context.awaitingAsyncContinuation = {
Expand All @@ -100,14 +84,6 @@ describe('orchestrateCopilotStream async continuation', () => {
const body = JSON.parse(String(opts.body))
expect(body).toEqual({
checkpointId: 'checkpoint-1',
results: [
{
callId: 'tool-1',
name: 'read',
data: { ok: true },
success: true,
},
],
})
})

Expand All @@ -123,21 +99,28 @@ describe('orchestrateCopilotStream async continuation', () => {
)

expect(result.success).toBe(true)
expect(markAsyncToolDelivered).toHaveBeenCalledWith('tool-1')
expect(fetchMock).toHaveBeenCalledTimes(1)
})

it('marks claimed tool calls delivered even when the resumed stream later records errors', async () => {
runStreamLoop
.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => {
context.awaitingAsyncContinuation = {
checkpointId: 'checkpoint-1',
runId: 'run-1',
pendingToolCallIds: ['tool-1'],
}
})
.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => {
context.errors.push('resume stream failed after handoff')
})
it('surfaces an explicit error when Go readiness check fails', async () => {
fetchMock.mockResolvedValueOnce({
ok: false,
status: 424,
json: async () => ({
error: 'checkpoint not ready',
code: 'checkpoint_not_ready',
retryable: true,
missingCallIds: ['tool-1'],
}),
})

runStreamLoop.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => {
context.awaitingAsyncContinuation = {
checkpointId: 'checkpoint-1',
runId: 'run-1',
pendingToolCallIds: ['tool-1'],
}
})

const result = await orchestrateCopilotStream(
{ message: 'hello' },
Expand All @@ -151,7 +134,8 @@ describe('orchestrateCopilotStream async continuation', () => {
)

expect(result.success).toBe(false)
expect(markAsyncToolDelivered).toHaveBeenCalledWith('tool-1')
expect(result.errors).toEqual(['checkpoint not ready'])
expect(runStreamLoop).toHaveBeenCalledTimes(1)
})

it('forwards done events while still marking async pauses on the run', async () => {
Expand Down Expand Up @@ -189,30 +173,23 @@ describe('orchestrateCopilotStream async continuation', () => {
expect(updateRunStatus).toHaveBeenCalledWith('run-1', 'paused_waiting_for_tool')
})

it('waits for a local running tool before retrying the claim', async () => {
it('waits for local pending tool promises before asking Go to resume', async () => {
const localPendingPromise = Promise.resolve({
status: 'success',
data: { ok: true },
})

claimCompletedAsyncToolCall
.mockResolvedValueOnce(null)
.mockResolvedValueOnce({ toolCallId: 'tool-1' })
getAsyncToolCall
.mockResolvedValueOnce({
toolCallId: 'tool-1',
toolName: 'read',
status: 'running',
result: null,
error: null,
})
.mockResolvedValue({
toolCallId: 'tool-1',
toolName: 'read',
status: 'completed',
result: { ok: true },
error: null,
})
fetchMock.mockResolvedValueOnce({
ok: true,
json: async () => ({
success: true,
checkpointId: 'checkpoint-1',
runId: 'run-1',
resumeState: 'ready',
ready: true,
pendingCallIds: ['tool-1'],
missingCallIds: [],
}),
})

runStreamLoop
.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => {
Expand All @@ -226,11 +203,8 @@ describe('orchestrateCopilotStream async continuation', () => {
.mockImplementationOnce(async (url: string, opts: RequestInit) => {
expect(url).toContain('/api/tools/resume')
const body = JSON.parse(String(opts.body))
expect(body.results[0]).toEqual({
callId: 'tool-1',
name: 'read',
data: { ok: true },
success: true,
expect(body).toEqual({
checkpointId: 'checkpoint-1',
})
})

Expand All @@ -247,10 +221,22 @@ describe('orchestrateCopilotStream async continuation', () => {

expect(result.success).toBe(true)
expect(runStreamLoop).toHaveBeenCalledTimes(2)
expect(markAsyncToolDelivered).toHaveBeenCalledWith('tool-1')
})

it('releases claimed rows if the resume stream throws before delivery is marked', async () => {
it('retries tool resume after an upstream 502 and succeeds', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
json: async () => ({
success: true,
checkpointId: 'checkpoint-1',
runId: 'run-1',
resumeState: 'ready',
ready: true,
pendingCallIds: ['tool-1'],
missingCallIds: [],
}),
})

runStreamLoop
.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => {
context.awaitingAsyncContinuation = {
Expand All @@ -260,10 +246,17 @@ describe('orchestrateCopilotStream async continuation', () => {
}
})
.mockImplementationOnce(async () => {
throw new Error('resume failed')
throw new Error('Copilot backend error (502): <html><h1>502 Bad Gateway</h1></html>')
})
.mockImplementationOnce(async (url: string, opts: RequestInit) => {
expect(url).toContain('/api/tools/resume')
const body = JSON.parse(String(opts.body))
expect(body).toEqual({
checkpointId: 'checkpoint-1',
})
})

const result = await orchestrateCopilotStream(
const resultPromise = orchestrateCopilotStream(
{ message: 'hello' },
{
userId: 'user-1',
Expand All @@ -274,45 +267,10 @@ describe('orchestrateCopilotStream async continuation', () => {
}
)

expect(result.success).toBe(false)
expect(releaseCompletedAsyncToolClaim).toHaveBeenCalledWith('tool-1', 'run-1')
expect(markAsyncToolDelivered).not.toHaveBeenCalled()
})

it('does not send a partial resume payload when only some pending tool calls are claimable', async () => {
claimCompletedAsyncToolCall
.mockResolvedValueOnce({ toolCallId: 'tool-1' })
.mockResolvedValueOnce(null)
.mockResolvedValueOnce({ toolCallId: 'tool-1' })
.mockResolvedValueOnce(null)
.mockResolvedValueOnce({ toolCallId: 'tool-1' })
.mockResolvedValueOnce(null)
.mockResolvedValueOnce({ toolCallId: 'tool-1' })
.mockResolvedValueOnce(null)
getAsyncToolCall.mockResolvedValue(null)

runStreamLoop.mockImplementationOnce(async (_url: string, _opts: RequestInit, context: any) => {
context.awaitingAsyncContinuation = {
checkpointId: 'checkpoint-1',
runId: 'run-1',
pendingToolCallIds: ['tool-1', 'tool-2'],
}
})

const result = await orchestrateCopilotStream(
{ message: 'hello' },
{
userId: 'user-1',
workflowId: 'workflow-1',
chatId: 'chat-1',
executionId: 'exec-1',
runId: 'run-1',
}
)
await vi.runAllTimersAsync()
const result = await resultPromise

expect(result.success).toBe(true)
expect(runStreamLoop).toHaveBeenCalledTimes(1)
expect(releaseCompletedAsyncToolClaim).toHaveBeenCalledWith('tool-1', 'run-1')
expect(markAsyncToolDelivered).not.toHaveBeenCalled()
expect(runStreamLoop).toHaveBeenCalledTimes(3)
})
})
Loading