From b003b823d315612da9ba97aac8db171c75f597f3 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Tue, 26 May 2026 19:19:28 +0000 Subject: [PATCH] Fix CRR PK retrofit corruption, sync schema skew ACK, and failed brain connect wipe - Skip primary-key retrofit for tables with __crsql_clock companions (same guard as FK retrofit) - Reject unknown sync tables instead of silently skipping; use applyChanges appliedCount in ACKs - Defer device registry clear until connectToBrain succeeds so failed connects keep local pairing data Co-authored-by: Arul Sharma --- .../src/services/sync/syncHostService.ts | 4 +- .../src/services/sync/syncPeerService.ts | 6 +- .../src/services/sync/syncService.test.ts | 107 ++++++++++++++++++ apps/ade-cli/src/services/sync/syncService.ts | 2 +- .../services/state/kvDb.migrations.test.ts | 20 ++++ .../src/main/services/state/kvDb.sync.test.ts | 20 ++-- apps/desktop/src/main/services/state/kvDb.ts | 18 ++- 7 files changed, 158 insertions(+), 19 deletions(-) create mode 100644 apps/ade-cli/src/services/sync/syncService.test.ts diff --git a/apps/ade-cli/src/services/sync/syncHostService.ts b/apps/ade-cli/src/services/sync/syncHostService.ts index 0066c28ad..9ffa347eb 100644 --- a/apps/ade-cli/src/services/sync/syncHostService.ts +++ b/apps/ade-cli/src/services/sync/syncHostService.ts @@ -2846,8 +2846,8 @@ export function createSyncHostService(args: SyncHostServiceArgs) { try { let appliedCount = 0; if (changes.length > 0) { - args.db.sync.applyChanges(changes); - appliedCount = changes.length; + const applyResult = args.db.sync.applyChanges(changes); + appliedCount = applyResult.appliedCount; peer.lastAppliedAt = nowIso(); lastBroadcastAt = nowIso(); args.onStateChanged?.(); diff --git a/apps/ade-cli/src/services/sync/syncPeerService.ts b/apps/ade-cli/src/services/sync/syncPeerService.ts index 3ec1a2d77..b0e09bb12 100644 --- a/apps/ade-cli/src/services/sync/syncPeerService.ts +++ b/apps/ade-cli/src/services/sync/syncPeerService.ts @@ -313,13 +313,15 @@ export function createSyncPeerService(args: SyncPeerServiceArgs) { const payload = (envelope.payload ?? {}) as SyncChangesetBatchPayload; const changes = Array.isArray(payload.changes) ? payload.changes : []; try { + let appliedCount = 0; if (changes.length) { - args.db.sync.applyChanges(changes); + const applyResult = args.db.sync.applyChanges(changes); + appliedCount = applyResult.appliedCount; args.onRemoteChangesApplied?.(); } latestRemoteDbVersion = Math.max(latestRemoteDbVersion, Math.floor(payload.toDbVersion ?? latestRemoteDbVersion)); if (connectionDraft) connectionDraft.lastRemoteDbVersion = latestRemoteDbVersion; - sendChangesetAck(payload, true, args.db.sync.getDbVersion(), changes.length); + sendChangesetAck(payload, true, args.db.sync.getDbVersion(), appliedCount); emitStatus(); } catch (error) { sendChangesetAck(payload, false, args.db.sync.getDbVersion(), 0, error); diff --git a/apps/ade-cli/src/services/sync/syncService.test.ts b/apps/ade-cli/src/services/sync/syncService.test.ts new file mode 100644 index 000000000..f487a366a --- /dev/null +++ b/apps/ade-cli/src/services/sync/syncService.test.ts @@ -0,0 +1,107 @@ +import fs from "node:fs"; +import net from "node:net"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { openKvDb, type AdeDb } from "../../../../desktop/src/main/services/state/kvDb"; +import { createSyncService, type SyncService } from "./syncService"; + +function createLogger() { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; +} + +function makeTempRoot(prefix: string): string { + return fs.mkdtempSync(path.join(os.tmpdir(), prefix)); +} + +async function getUnusedPort(): Promise { + const server = net.createServer(); + await new Promise((resolve) => server.listen(0, "127.0.0.1", resolve)); + const address = server.address(); + const port = typeof address === "object" && address ? address.port : 0; + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }); + return port; +} + +function createService(db: AdeDb, projectRoot: string): SyncService { + return createSyncService({ + db, + logger: createLogger() as any, + projectRoot, + hostStartupEnabled: false, + localDeviceIdPath: path.join(projectRoot, ".ade", "secrets", "sync-device-id"), + phonePairingStateDir: path.join(projectRoot, ".ade", "secrets", "sync"), + fileService: {} as any, + laneService: { + list: vi.fn(async () => []), + } as any, + prService: {} as any, + sessionService: { + list: vi.fn(() => []), + get: vi.fn(() => null), + readTranscriptTail: vi.fn(async () => ""), + } as any, + ptyService: { + readTranscriptTail: vi.fn(async () => ""), + enrichSessions: vi.fn((rows: unknown[]) => rows), + } as any, + computerUseArtifactBrokerService: { + listArtifacts: vi.fn(() => []), + } as any, + agentChatService: { + listSessions: vi.fn(async () => []), + } as any, + processService: { + listRuntime: vi.fn(() => []), + } as any, + }); +} + +describe("createSyncService", () => { + const cleanupRoots: string[] = []; + + afterEach(() => { + for (const root of cleanupRoots.splice(0)) { + fs.rmSync(root, { recursive: true, force: true }); + } + }); + + it("keeps the local device registry when connectToBrain fails before handshake", async () => { + const projectRoot = makeTempRoot("ade-sync-service-connect-fail-"); + cleanupRoots.push(projectRoot); + const db = await openKvDb(path.join(projectRoot, ".ade", "kv.sqlite"), createLogger() as any); + (db.sync as { isAvailable?: () => boolean }).isAvailable = () => true; + const service = createService(db, projectRoot); + const registry = service.getDeviceRegistryService(); + registry.upsertPeerMetadata({ + deviceId: "peer-old", + deviceName: "Previous host", + platform: "macOS", + deviceType: "desktop", + siteId: "peer-site", + dbVersion: 12, + }); + + expect(registry.getDevice("peer-old")?.name).toBe("Previous host"); + + try { + await expect(service.connectToBrain({ + host: "127.0.0.1", + port: await getUnusedPort(), + token: "bad-token", + })).rejects.toThrow(); + + expect(registry.getDevice("peer-old")?.name).toBe("Previous host"); + } finally { + await service.dispose(); + db.close(); + } + }); +}); diff --git a/apps/ade-cli/src/services/sync/syncService.ts b/apps/ade-cli/src/services/sync/syncService.ts index 083140baf..619f10da1 100644 --- a/apps/ade-cli/src/services/sync/syncService.ts +++ b/apps/ade-cli/src/services/sync/syncService.ts @@ -1018,11 +1018,11 @@ export function createSyncService(args: SyncServiceArgs) { throw new Error("Machine sync is unavailable because the CRDT database extension is not loaded."); } await stopHostIfRunning(); - deviceRegistryService.clearClusterRegistryForViewerJoin(); writeSavedDraft(draft); syncPeerService.setSavedDraft(draft); try { await syncPeerService.connect(draft); + deviceRegistryService.clearClusterRegistryForViewerJoin(); deviceRegistryService.touchLocalDevice({ lastSeenAt: nowIso() }); syncPeerService.flushLocalChanges(); await sleep(150); diff --git a/apps/desktop/src/main/services/state/kvDb.migrations.test.ts b/apps/desktop/src/main/services/state/kvDb.migrations.test.ts index f9284c1ff..99aa5d01a 100644 --- a/apps/desktop/src/main/services/state/kvDb.migrations.test.ts +++ b/apps/desktop/src/main/services/state/kvDb.migrations.test.ts @@ -5,6 +5,7 @@ import { createRequire } from "node:module"; import { describe, expect, it } from "vitest"; import { createLaneWorktreeLockService } from "../lanes/laneWorktreeLockService"; import { openKvDb } from "./kvDb"; +import { isCrsqliteAvailable } from "./crsqliteExtension"; const require = createRequire(import.meta.url); @@ -156,6 +157,25 @@ describe("kvDb migrations - legacy upgrade paths", () => { } }); + it.skipIf(!isCrsqliteAvailable())("skips primary-key retrofit for tables that already have __crsql_clock companions", async () => { + const dbPath = makeDbPath("ade-kvdb-pk-retrofit-skip-crr-"); + const first = await openKvDb(dbPath, createLogger()); + first.run("create unique index if not exists temp_ade_pk_retrofit_probe on lanes(project_id, name)"); + first.close(); + + const reopened = await openKvDb(dbPath, createLogger()); + try { + expect( + reopened.get<{ name: string }>( + "select name from sqlite_master where type = 'table' and name = 'lanes__crsql_clock' limit 1", + )?.name, + ).toBe("lanes__crsql_clock"); + reopened.run("drop index if exists temp_ade_pk_retrofit_probe"); + } finally { + reopened.close(); + } + }); + it("coalesces duplicate lane_linear_issue_links rows during migrate", async () => { const dbPath = makeDbPath("ade-kvdb-linear-links-dedupe-"); fs.mkdirSync(path.dirname(dbPath), { recursive: true }); diff --git a/apps/desktop/src/main/services/state/kvDb.sync.test.ts b/apps/desktop/src/main/services/state/kvDb.sync.test.ts index 134aa1b51..31e26f45f 100644 --- a/apps/desktop/src/main/services/state/kvDb.sync.test.ts +++ b/apps/desktop/src/main/services/state/kvDb.sync.test.ts @@ -278,20 +278,20 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => { it("ignores CRDT changes for legacy unified_memories tables removed in #329", async () => { const db2 = await openKvDb(makeDbPath("ade-kvdb-sync-mem-skip-"), createLogger() as any); - const legacyChange = { - table: "unified_memories", - pk: Buffer.from([0x01, 0x06, 0, 0, 0, 0, 0, 1]).toString("base64"), + const legacyChanges = ["unified_memories", "unified_memories_fts_content"].map((table, index) => ({ + table, + pk: Buffer.from([0x01, 0x06, 0, 0, 0, 0, 0, index + 1]).toString("base64"), cid: "id", val: null, col_version: 1, - db_version: 1, + db_version: index + 1, site_id: "a".repeat(32), cl: 1, - seq: 1, - }; + seq: index, + })); const beforeVersion = db2.sync.getDbVersion(); - const result = db2.sync.applyChanges([legacyChange as any]); + const result = db2.sync.applyChanges(legacyChanges as any); expect(result.appliedCount).toBe(0); expect(result.touchedTables).toEqual([]); expect(db2.sync.getDbVersion()).toBe(beforeVersion); @@ -299,7 +299,7 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => { db2.close(); }); - it("silently skips CRDT changes for unknown future tables", async () => { + it("rejects CRDT changes for unknown future tables", async () => { const db2 = await openKvDb(makeDbPath("ade-kvdb-sync-future-table-"), createLogger() as any); const futureChange = { table: "missing_future_table", @@ -314,9 +314,7 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => { }; const beforeVersion = db2.sync.getDbVersion(); - const result = db2.sync.applyChanges([futureChange as any]); - expect(result.appliedCount).toBe(0); - expect(result.touchedTables).toEqual([]); + expect(() => db2.sync.applyChanges([futureChange as any])).toThrow(/unknown_sync_table:missing_future_table/); expect(db2.sync.getDbVersion()).toBe(beforeVersion); db2.close(); diff --git a/apps/desktop/src/main/services/state/kvDb.ts b/apps/desktop/src/main/services/state/kvDb.ts index b19622d4e..0e7670bf8 100644 --- a/apps/desktop/src/main/services/state/kvDb.ts +++ b/apps/desktop/src/main/services/state/kvDb.ts @@ -11,6 +11,13 @@ import type { ApplyRemoteChangesResult, CrsqlChangeRow, SyncScalar } from "../.. type DatabaseSyncConstructor = new (dbPath: string, options?: { allowExtension?: boolean }) => DatabaseSyncType; +/** CRDT tables removed from the schema; inbound tombstones are ignored. */ +const SYNC_RETIRED_TABLES = new Set(["unified_memories", "unified_memories_fts"]); + +function isRetiredIncomingSyncTable(tableName: string): boolean { + return SYNC_RETIRED_TABLES.has(tableName) || tableName.startsWith("unified_memories_"); +} + // Anchor createRequire to a synthetic CJS file so builtin resolution follows the active runtime. const require = createRequire(path.join(process.cwd(), "ade-runtime.cjs")); const { DatabaseSync } = require("node:sqlite") as { DatabaseSync: DatabaseSyncConstructor }; @@ -208,6 +215,10 @@ function retrofitLegacyPrimaryKeyNotNullSchema(db: DatabaseSyncType): boolean { runStatement(db, "pragma foreign_keys = off"); try { for (const table of tables) { + // CRR tables must be altered via crsql_begin_alter/commit_alter or rebuilt + // with rebuildCrrTableWithBackfill — never DROP/rename wholesale. + if (rawHasTable(db, `${table.name}__crsql_clock`)) continue; + const tableInfo = allRows<{ name: string; type: string; @@ -3038,9 +3049,10 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise { try { for (const rawChange of changes) { if (isLocalOnlyQueueWipeMarkerChange(rawChange)) continue; - // Skip changes for tables that no longer exist in the schema - // (e.g. unified_memories removed in #329). - if (!rawHasTable(db, rawChange.table)) continue; + if (!rawHasTable(db, rawChange.table)) { + if (isRetiredIncomingSyncTable(rawChange.table)) continue; + throw new Error(`unknown_sync_table:${rawChange.table}`); + } const change = normalizeIncomingCrsqlChange(db, rawChange); const result = runStatement( db,