Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1091,4 +1091,48 @@ describe("orchestration watcher resilience", () => {
off();
await svc.dispose();
});

it("returns etag_conflict instead of overwriting a newer on-disk manifest", async () => {
const svc = createOrchestrationService({ resolveLaneWorktree: () => lane });
const { manifest } = await svc.runCreate({
laneId: "L-1",
leadSessionId: "S-lead",
bundleRoot: lane,
title: "Initial",
});

const manifestPath = path.join(manifest.bundlePath, "manifest.json");
const external = JSON.parse(await fsp.readFile(manifestPath, "utf-8")) as {
title: string;
serverGeneration: number;
etag: string;
};
external.title = "external-title";
external.serverGeneration += 5;
external.etag = `g${external.serverGeneration}-external`;
await fsp.writeFile(manifestPath, JSON.stringify(external, null, 2));

const patchRes = await svc.manifestPatch(
{
runId: manifest.runId,
ifMatchEtag: manifest.etag,
actorRole: "lead",
actorSessionId: "S-lead",
patches: [{ op: "replace", path: "/title", value: "patched-title" }],
},
manifest.bundlePath,
);

expect(patchRes.ok).toBe(false);
if (patchRes.ok) return;
expect(patchRes.error).toBe("etag_conflict");

const onDisk = JSON.parse(await fsp.readFile(manifestPath, "utf-8")) as {
title: string;
serverGeneration: number;
};
expect(onDisk.title).toBe("external-title");
expect(onDisk.serverGeneration).toBeGreaterThan(manifest.serverGeneration);
await svc.dispose();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ class AsyncMutex {
}
}

/** Thrown when manifest.json on disk advanced while a writer held the run mutex. */
class OrchestrationPersistConflictError extends Error {
readonly onDisk: OrchestrationManifest;
constructor(onDisk: OrchestrationManifest) {
super("manifest on disk is newer than the in-flight write");
this.name = "OrchestrationPersistConflictError";
this.onDisk = onDisk;
}
}

const MANIFEST_FILE = "manifest.json";
const PLAN_FILE = "plan.md";
const GEN_FILE = ".gen";
Expand Down Expand Up @@ -166,7 +176,11 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
await atomicWrite(genPath, `${gen}\n`);
}

async function atomicWrite(target: string, contents: string): Promise<void> {
async function atomicWrite(
target: string,
contents: string,
options?: { beforeCommit?: () => Promise<void> },
): Promise<void> {
const dir = path.dirname(target);
await fsp.mkdir(dir, { recursive: true });
const tmp = `${target}.${process.pid}.${Date.now()}.${Math.random()
Expand All @@ -179,6 +193,9 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
} finally {
await handle.close();
}
if (options?.beforeCommit) {
await options.beforeCommit();
}
await fsp.rename(tmp, target);
}

Expand Down Expand Up @@ -517,10 +534,12 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
}
runtime.watcherDebounceTimer = setTimeout(() => {
runtime.watcherDebounceTimer = null;
if (Date.now() < runtime.recentSelfWriteUntil) {
return; // suppress self-emitted events
}
void handleExternalChange(runtime, kind);
void runtime.mutex.run(async () => {
if (Date.now() < runtime.recentSelfWriteUntil) {
return; // suppress self-emitted events
}
await handleExternalChange(runtime, kind);
});
}, WATCHER_DEBOUNCE_MS);
};
watcher.on("change", (full) => {
Expand Down Expand Up @@ -604,8 +623,32 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
manifest: OrchestrationManifest,
): Promise<void> {
const manifestPath = path.join(runtime.bundlePath, MANIFEST_FILE);
const rejectIfDiskAdvanced = async (): Promise<void> => {
try {
const raw = await fsp.readFile(manifestPath, "utf-8");
const onDisk = normalizeManifestShape(JSON.parse(raw) as OrchestrationManifest);
if (
onDisk.runId === runtime.runId &&
onDisk.serverGeneration > manifest.serverGeneration
) {
runtime.manifest = onDisk;
throw new OrchestrationPersistConflictError(onDisk);
}
} catch (err) {
if (err instanceof OrchestrationPersistConflictError) {
throw err;
}
const e = err as NodeJS.ErrnoException;
if (e?.code !== "ENOENT") {
throw err;
}
}
};
await rejectIfDiskAdvanced();
markSelfWrite(runtime);
await atomicWrite(manifestPath, JSON.stringify(manifest, null, 2));
await atomicWrite(manifestPath, JSON.stringify(manifest, null, 2), {
beforeCommit: rejectIfDiskAdvanced,
});
await writeServerGeneration(runtime.bundlePath, manifest.serverGeneration);
runtime.manifest = manifest;
await appendRunIndexEntry(
Expand Down Expand Up @@ -825,7 +868,20 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
];
next.history = ring;

await persistManifest(runtime, next);
try {
await persistManifest(runtime, next);
} catch (err) {
if (err instanceof OrchestrationPersistConflictError) {
const latest = runtime.manifest ?? err.onDisk;
return {
ok: false,
error: "etag_conflict",
manifest: latest,
etag: latest.etag,
};
}
throw err;
}
emit({
runId: req.runId,
kind: "manifest",
Expand Down Expand Up @@ -1272,7 +1328,18 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
if (agent) agent.lastHeartbeatAt = nowIso();
// Heartbeats are liveness metadata. They must not invalidate the optimistic
// concurrency etag that agents use for the next manifest mutation.
await persistManifest(runtime, next);
try {
await persistManifest(runtime, next);
} catch (err) {
if (err instanceof OrchestrationPersistConflictError) {
return {
ok: false,
reason: "etag_conflict",
etag: err.onDisk.etag,
};
}
throw err;
}
emit({
runId: req.runId,
kind: "manifest",
Expand Down Expand Up @@ -1408,7 +1475,16 @@ export function createOrchestrationService(deps: OrchestrationServiceDeps) {
...runtime.manifest.history.slice(-HISTORY_RING_LIMIT + 1),
{ etag, at: updatedAt, summary, patchKindSummary: summarizePatch([...patches]) },
];
await persistManifest(runtime, next);
try {
await persistManifest(runtime, next);
} catch (err) {
if (err instanceof OrchestrationPersistConflictError) {
throw new Error(
`etag_conflict: manifest on disk advanced to generation ${err.onDisk.serverGeneration}`,
);
}
throw err;
}
emit({
runId: runtime.runId,
kind: "manifest",
Expand Down
Loading