From 64be409df4f10f60dcc901ff94215a62e4696ffd Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Tue, 26 May 2026 08:14:20 +0000 Subject: [PATCH] Fix orchestration stale writes after external manifest swap When git checkout replaces manifest.json with a different runId, the watcher marked the run suspended but kept a stale in-memory manifest. Subsequent manifestPatch/claimTask calls could overwrite the foreign manifest on disk. Clear the runtime cache on runId mismatch, treat mismatches in loadIntoRuntime as suspended, and reject all mutation paths while suspended. Add a regression test for the watcher + patch interaction. Co-authored-by: Arul Sharma --- .../orchestrationService.test.ts | 41 +++++++++++++++++++ .../orchestration/orchestrationService.ts | 41 +++++++++++++++++-- 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts b/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts index f520cb093..3b228f31b 100644 --- a/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts +++ b/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts @@ -1070,6 +1070,47 @@ describe("orchestration watcher resilience", () => { await svc.dispose(); }); + it("blocks manifest writes after an external manifest runId swap", async () => { + const svc = createOrchestrationService({ resolveLaneWorktree: () => lane }); + const { manifest, etag } = await svc.runCreate({ + laneId: "L-1", + leadSessionId: "S-lead", + bundleRoot: lane, + title: "Original run", + }); + await svc.subscribe(manifest.runId, manifest.bundlePath); + const manifestPath = path.join(manifest.bundlePath, "manifest.json"); + const foreign = { + ...JSON.parse(await fsp.readFile(manifestPath, "utf-8")), + runId: "R-foreign-checkout", + etag: "etag-foreign", + title: "Foreign branch manifest", + }; + // Wait past the self-write suppression window used by persistManifest. + await new Promise((resolve) => setTimeout(resolve, 1_100)); + await fsp.writeFile(manifestPath, JSON.stringify(foreign, null, 2)); + await new Promise((resolve) => setTimeout(resolve, 120)); + + const patch = await svc.manifestPatch( + { + runId: manifest.runId, + ifMatchEtag: etag, + actorRole: "lead", + actorSessionId: "S-lead", + patches: [{ op: "replace", path: "/title", value: "Stale write attempt" }], + }, + manifest.bundlePath, + ); + expect(patch.ok).toBe(false); + if (patch.ok) return; + expect(patch.message).toContain("suspended"); + + const onDisk = JSON.parse(await fsp.readFile(manifestPath, "utf-8")); + expect(onDisk.runId).toBe("R-foreign-checkout"); + expect(onDisk.title).toBe("Foreign branch manifest"); + await svc.dispose(); + }); + it("planAppend produces an event with the new contents", async () => { const svc = createOrchestrationService({ resolveLaneWorktree: () => lane }); const { manifest } = await svc.runCreate({ diff --git a/apps/desktop/src/main/services/orchestration/orchestrationService.ts b/apps/desktop/src/main/services/orchestration/orchestrationService.ts index 3aeaed964..b12560640 100644 --- a/apps/desktop/src/main/services/orchestration/orchestrationService.ts +++ b/apps/desktop/src/main/services/orchestration/orchestrationService.ts @@ -71,6 +71,8 @@ const WATCHER_IDLE_CLOSE_MS = 30_000; const ORCHESTRATION_INDEX_VERSION = 1; const RUN_LIST_DEFAULT_LIMIT = 100; const RUN_LIST_MAX_LIMIT = 250; +const RUN_SUSPENDED_MESSAGE = + "orchestration run is suspended (bundle changed externally); re-open the run or restore the correct branch"; export type OrchestrationServiceEvents = { event: (payload: OrchestrationEventPayload) => void; @@ -449,9 +451,10 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { ); } if (manifest.runId !== runtime.runId) { - throw new Error( - `manifest.runId ${manifest.runId} does not match expected ${runtime.runId}`, - ); + runtime.suspended = true; + runtime.manifest = null; + runtime.planMd = null; + return; } runtime.manifest = normalizeManifestShape(manifest); } catch (err) { @@ -565,6 +568,8 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { // file), do not blindly etag-bump; mark suspended and ignore. if (next.runId !== runtime.runId) { runtime.suspended = true; + runtime.manifest = null; + runtime.planMd = null; emit({ runId: runtime.runId, kind: "lifecycle", @@ -626,6 +631,12 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { runtime.planMd = plan; } + function assertRunWritable(runtime: RunRuntime): void { + if (runtime.suspended) { + throw new Error(RUN_SUSPENDED_MESSAGE); + } + } + // -------------------------------------------------------------------------- // Public API // -------------------------------------------------------------------------- @@ -731,6 +742,13 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + if (runtime.suspended) { + return { + ok: false, + error: "validation_failed", + message: RUN_SUSPENDED_MESSAGE, + }; + } const current = runtime.manifest; if (!current) { return { @@ -849,6 +867,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + assertRunWritable(runtime); if (!runtime.manifest) throw new Error(`run ${req.runId} not found`); const prev = runtime.planMd ?? ""; const heading = req.section.startsWith("#") @@ -876,6 +895,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + assertRunWritable(runtime); if (!runtime.manifest) throw new Error(`run ${req.runId} not found`); if (runtime.manifest.etag !== req.ifMatchEtag) { return { error: "etag_conflict", etag: runtime.manifest.etag }; @@ -900,6 +920,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + assertRunWritable(runtime); if (!runtime.manifest) throw new Error(`run ${req.runId} not found`); const id = `A-${runtime.manifest.assets.length + 1}-${shortRand()}`; const asset: OrchestrationAsset = { @@ -935,6 +956,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + assertRunWritable(runtime); const manifest = runtime.manifest!; const registeredAgent = manifest.agents.find( (agent) => agent.sessionId === req.sessionId, @@ -1014,6 +1036,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + assertRunWritable(runtime); const manifest = runtime.manifest; if (!manifest) throw new Error(`run ${req.runId} not found`); const task = manifest.tasks.find((entry) => entry.id === req.taskId); @@ -1114,6 +1137,13 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + if (runtime.suspended) { + return { + ok: false, + error: "validation_failed", + message: RUN_SUSPENDED_MESSAGE, + }; + } const manifest = runtime.manifest; if (!manifest) { return { @@ -1267,6 +1297,9 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(req.runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + if (runtime.suspended) { + return { ok: false, reason: RUN_SUSPENDED_MESSAGE }; + } const manifest = runtime.manifest; if (!manifest) return { ok: false, reason: `run ${req.runId} not found` }; if (!manifest.agents.some((agent) => agent.sessionId === req.sessionId)) { @@ -1401,6 +1434,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { patches: readonly ManifestPatchOp[], summary: string, ): Promise<{ manifest: OrchestrationManifest; etag: string }> { + assertRunWritable(runtime); if (!runtime.manifest) throw new Error("manifest not loaded"); const next = normalizeManifestShape(applyPatches(runtime.manifest, patches)); const updatedAt = nowIso(); @@ -1439,6 +1473,7 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { const runtime = getOrCreateRuntime(runId, bundlePath); return runtime.mutex.run(async () => { await loadIntoRuntime(runtime); + assertRunWritable(runtime); if (!runtime.manifest) { return { ok: false, error: "run_not_found", message: `run ${runId} not found` }; }