From 2bd23c2abf45bdcd2181c59580111806a808690e Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Wed, 27 May 2026 09:15:26 +0000 Subject: [PATCH] Fix orchestration manifest clobber during external disk updates Serialize watcher reloads on the run mutex and reject in-flight writes when manifest.json on disk has a newer serverGeneration than the snapshot being persisted. Prevents stale in-memory patches from overwriting externally advanced manifests (e.g. git pull on the lane worktree). Co-authored-by: Arul Sharma --- .../orchestrationService.test.ts | 44 +++++++++ .../orchestration/orchestrationService.ts | 94 +++++++++++++++++-- 2 files changed, 129 insertions(+), 9 deletions(-) diff --git a/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts b/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts index 438db49ac..bf5b59e79 100644 --- a/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts +++ b/apps/desktop/src/main/services/orchestration/orchestrationService.test.ts @@ -1091,4 +1091,48 @@ describe("orchestration watcher resilience", () => { off(); await svc.dispose(); }); + + it("returns etag_conflict instead of overwriting a newer on-disk manifest", async () => { + const svc = createOrchestrationService({ resolveLaneWorktree: () => lane }); + const { manifest } = await svc.runCreate({ + laneId: "L-1", + leadSessionId: "S-lead", + bundleRoot: lane, + title: "Initial", + }); + + const manifestPath = path.join(manifest.bundlePath, "manifest.json"); + const external = JSON.parse(await fsp.readFile(manifestPath, "utf-8")) as { + title: string; + serverGeneration: number; + etag: string; + }; + external.title = "external-title"; + external.serverGeneration += 5; + external.etag = `g${external.serverGeneration}-external`; + await fsp.writeFile(manifestPath, JSON.stringify(external, null, 2)); + + const patchRes = await svc.manifestPatch( + { + runId: manifest.runId, + ifMatchEtag: manifest.etag, + actorRole: "lead", + actorSessionId: "S-lead", + patches: [{ op: "replace", path: "/title", value: "patched-title" }], + }, + manifest.bundlePath, + ); + + expect(patchRes.ok).toBe(false); + if (patchRes.ok) return; + expect(patchRes.error).toBe("etag_conflict"); + + const onDisk = JSON.parse(await fsp.readFile(manifestPath, "utf-8")) as { + title: string; + serverGeneration: number; + }; + expect(onDisk.title).toBe("external-title"); + expect(onDisk.serverGeneration).toBeGreaterThan(manifest.serverGeneration); + await svc.dispose(); + }); }); diff --git a/apps/desktop/src/main/services/orchestration/orchestrationService.ts b/apps/desktop/src/main/services/orchestration/orchestrationService.ts index fcc885085..285d86f51 100644 --- a/apps/desktop/src/main/services/orchestration/orchestrationService.ts +++ b/apps/desktop/src/main/services/orchestration/orchestrationService.ts @@ -60,6 +60,16 @@ class AsyncMutex { } } +/** Thrown when manifest.json on disk advanced while a writer held the run mutex. */ +class OrchestrationPersistConflictError extends Error { + readonly onDisk: OrchestrationManifest; + constructor(onDisk: OrchestrationManifest) { + super("manifest on disk is newer than the in-flight write"); + this.name = "OrchestrationPersistConflictError"; + this.onDisk = onDisk; + } +} + const MANIFEST_FILE = "manifest.json"; const PLAN_FILE = "plan.md"; const GEN_FILE = ".gen"; @@ -166,7 +176,11 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { await atomicWrite(genPath, `${gen}\n`); } - async function atomicWrite(target: string, contents: string): Promise { + async function atomicWrite( + target: string, + contents: string, + options?: { beforeCommit?: () => Promise }, + ): Promise { const dir = path.dirname(target); await fsp.mkdir(dir, { recursive: true }); const tmp = `${target}.${process.pid}.${Date.now()}.${Math.random() @@ -179,6 +193,9 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { } finally { await handle.close(); } + if (options?.beforeCommit) { + await options.beforeCommit(); + } await fsp.rename(tmp, target); } @@ -517,10 +534,12 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { } runtime.watcherDebounceTimer = setTimeout(() => { runtime.watcherDebounceTimer = null; - if (Date.now() < runtime.recentSelfWriteUntil) { - return; // suppress self-emitted events - } - void handleExternalChange(runtime, kind); + void runtime.mutex.run(async () => { + if (Date.now() < runtime.recentSelfWriteUntil) { + return; // suppress self-emitted events + } + await handleExternalChange(runtime, kind); + }); }, WATCHER_DEBOUNCE_MS); }; watcher.on("change", (full) => { @@ -604,8 +623,32 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { manifest: OrchestrationManifest, ): Promise { const manifestPath = path.join(runtime.bundlePath, MANIFEST_FILE); + const rejectIfDiskAdvanced = async (): Promise => { + try { + const raw = await fsp.readFile(manifestPath, "utf-8"); + const onDisk = normalizeManifestShape(JSON.parse(raw) as OrchestrationManifest); + if ( + onDisk.runId === runtime.runId && + onDisk.serverGeneration > manifest.serverGeneration + ) { + runtime.manifest = onDisk; + throw new OrchestrationPersistConflictError(onDisk); + } + } catch (err) { + if (err instanceof OrchestrationPersistConflictError) { + throw err; + } + const e = err as NodeJS.ErrnoException; + if (e?.code !== "ENOENT") { + throw err; + } + } + }; + await rejectIfDiskAdvanced(); markSelfWrite(runtime); - await atomicWrite(manifestPath, JSON.stringify(manifest, null, 2)); + await atomicWrite(manifestPath, JSON.stringify(manifest, null, 2), { + beforeCommit: rejectIfDiskAdvanced, + }); await writeServerGeneration(runtime.bundlePath, manifest.serverGeneration); runtime.manifest = manifest; await appendRunIndexEntry( @@ -825,7 +868,20 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { ]; next.history = ring; - await persistManifest(runtime, next); + try { + await persistManifest(runtime, next); + } catch (err) { + if (err instanceof OrchestrationPersistConflictError) { + const latest = runtime.manifest ?? err.onDisk; + return { + ok: false, + error: "etag_conflict", + manifest: latest, + etag: latest.etag, + }; + } + throw err; + } emit({ runId: req.runId, kind: "manifest", @@ -1272,7 +1328,18 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { if (agent) agent.lastHeartbeatAt = nowIso(); // Heartbeats are liveness metadata. They must not invalidate the optimistic // concurrency etag that agents use for the next manifest mutation. - await persistManifest(runtime, next); + try { + await persistManifest(runtime, next); + } catch (err) { + if (err instanceof OrchestrationPersistConflictError) { + return { + ok: false, + reason: "etag_conflict", + etag: err.onDisk.etag, + }; + } + throw err; + } emit({ runId: req.runId, kind: "manifest", @@ -1408,7 +1475,16 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) { ...runtime.manifest.history.slice(-HISTORY_RING_LIMIT + 1), { etag, at: updatedAt, summary, patchKindSummary: summarizePatch([...patches]) }, ]; - await persistManifest(runtime, next); + try { + await persistManifest(runtime, next); + } catch (err) { + if (err instanceof OrchestrationPersistConflictError) { + throw new Error( + `etag_conflict: manifest on disk advanced to generation ${err.onDisk.serverGeneration}`, + ); + } + throw err; + } emit({ runId: runtime.runId, kind: "manifest",