Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions apps/desktop/src/main/services/state/kvDb.sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,26 @@ function makeDbPath(prefix: string): string {
return path.join(root, ".ade", "kv.sqlite");
}

function packedTextPrimaryKey(text: string): { type: "bytes"; base64: string } {
const textBytes = Buffer.from(text, "utf8");
return {
type: "bytes",
base64: Buffer.concat([Buffer.from([0x01, 0x0b, textBytes.length]), textBytes]).toString("base64"),
};
}

function syncPrimaryKeyMatchesText(value: unknown, text: string): boolean {
if (value === text) return true;
return Boolean(
value
&& typeof value === "object"
&& "type" in value
&& (value as { type?: unknown }).type === "bytes"
&& "base64" in value
&& (value as { base64?: unknown }).base64 === packedTextPrimaryKey(text).base64,
);
}

describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => {
it("persists a stable local site id and marks CRR tables", async () => {
const dbPath = makeDbPath("ade-kvdb-sync-site-");
Expand Down Expand Up @@ -178,6 +198,84 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => {
repaired.close();
});

it("does not replicate queue_landing_state overhaul wipe deletes to synced peers", async () => {
const dbPathA = makeDbPath("ade-kvdb-sync-queue-wipe-a-");
const dbA = await openKvDb(dbPathA, createLogger() as any);
const wipeMarker = "queue_landing_state.wiped_for_stacked_overhaul.v1";
const projectId = "project-queue-wipe";
const groupId = "group-queue-wipe";
const queueId = "queue-wipe-1";

dbA.run(
`insert into projects(id, root_path, display_name, default_base_ref, created_at, last_opened_at)
values (?, ?, ?, ?, ?, ?)`,
[projectId, "/repo/queue-wipe", "Queue Wipe", "main", "2026-03-15T00:00:00.000Z", "2026-03-15T00:00:00.000Z"],
);
dbA.run(
`insert into pr_groups(id, project_id, group_type, name, auto_rebase, ci_gating, target_branch, created_at)
values (?, ?, ?, ?, 0, 0, ?, ?)`,
[groupId, projectId, "stack", "Stack", "main", "2026-03-15T00:00:00.000Z"],
);
dbA.run(
`insert into queue_landing_state(
id, group_id, project_id, state, entries_json, config_json, current_position, started_at
) values (?, ?, ?, ?, ?, ?, 0, ?)`,
[queueId, groupId, projectId, "active", "[]", "{}", "2026-03-15T00:00:00.000Z"],
);

const dbB = await openKvDb(makeDbPath("ade-kvdb-sync-queue-wipe-b-"), createLogger() as any);
const baselineChanges = dbA.sync.exportChangesSince(0);
expect(baselineChanges.some((change) => change.table === "queue_landing_state")).toBe(true);
dbB.sync.applyChanges(baselineChanges);
expect(
dbB.get<{ id: string }>("select id from queue_landing_state where id = ?", [queueId])?.id,
).toBe(queueId);

const versionBeforeWipe = dbA.sync.getDbVersion();
dbA.run("delete from kv where key = ?", [wipeMarker]);
dbA.close();

const dbAReopened = await openKvDb(dbPathA, createLogger() as any);
expect(
dbAReopened.get<{ id: string }>("select id from queue_landing_state where id = ?", [queueId]),
).toBeNull();

const wipeChanges = dbAReopened.sync.exportChangesSince(versionBeforeWipe);
expect(wipeChanges.some((change) => change.table === "queue_landing_state")).toBe(false);
expect(wipeChanges.some((change) => change.table === "kv" && syncPrimaryKeyMatchesText(change.pk, wipeMarker))).toBe(false);

dbB.sync.applyChanges(wipeChanges);
expect(
dbB.get<{ id: string }>("select id from queue_landing_state where id = ?", [queueId])?.id,
).toBe(queueId);

const markerValueBeforeApply = dbB.get<{ value: string }>(
"select value from kv where key = ?",
[wipeMarker],
)?.value ?? null;
const versionBeforeMarkerApply = dbB.sync.getDbVersion();
const markerApplyResult = dbB.sync.applyChanges([{
table: "kv",
pk: packedTextPrimaryKey(wipeMarker),
cid: "value",
val: "remote-marker-should-not-apply",
col_version: 999,
db_version: versionBeforeMarkerApply + 1,
site_id: "f".repeat(32),
cl: 1,
seq: 1,
}]);
expect(markerApplyResult.appliedCount).toBe(0);
expect(markerApplyResult.touchedTables).toEqual([]);
expect(dbB.sync.getDbVersion()).toBe(versionBeforeMarkerApply);
expect(
dbB.get<{ value: string }>("select value from kv where key = ?", [wipeMarker])?.value ?? null,
).toBe(markerValueBeforeApply);

dbAReopened.close();
dbB.close();
});

it("ignores CRDT changes for legacy unified_memories tables removed in #329", async () => {
const db2 = await openKvDb(makeDbPath("ade-kvdb-sync-mem-skip-"), createLogger() as any);
const legacyChange = {
Expand Down
142 changes: 105 additions & 37 deletions apps/desktop/src/main/services/state/kvDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,95 @@ function dropCrrTriggers(db: DatabaseSyncType, tableName: string, logger?: Logge
return triggers.length;
}

/** Strip CRR triggers/metadata so row deletes stay local (no replicated tombstones). */
function deleteAllRowsWithoutCrrReplication(
db: DatabaseSyncType,
tableName: string,
logger?: Logger,
): void {
if (!rawHasTable(db, tableName)) return;

const clockTableName = `${tableName}__crsql_clock`;
const pksTableName = `${tableName}__crsql_pks`;
if (rawHasTable(db, clockTableName) || rawHasTable(db, pksTableName) || listCrrTriggers(db, tableName).length > 0) {
if (rawHasTable(db, "crsql_master") && rawHasColumn(db, "crsql_master", "tbl_name")) {
runStatement(db, "delete from crsql_master where tbl_name = ?", [tableName]);
}
if (rawHasTable(db, "crsql_changes") && rawHasColumn(db, "crsql_changes", "table")) {
runStatement(db, "delete from crsql_changes where [table] = ?", [tableName]);
}
try {
getRow(db, "select crsql_as_table(?) as ok", [tableName]);
} catch {
// Table may not be registered enough for crsql_as_table; shadow cleanup below still applies.
}
dropCrrTriggers(db, tableName, logger);
runStatement(db, `drop table if exists ${quoteIdentifier(clockTableName)}`);
runStatement(db, `drop table if exists ${quoteIdentifier(pksTableName)}`);
}

runStatement(db, `delete from ${quoteIdentifier(tableName)}`);
Comment on lines +524 to +551
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 crsql_master/crsql_changes cleanup gated behind shadow-table presence

deleteAllRowsWithoutCrrReplication enters the CRR-cleanup branch only when a shadow table (__crsql_clock / __crsql_pks) or a CRR trigger is found. If a previous partial initialization left entries only in crsql_master (no shadow tables, no triggers), those entries survive the wipe and queue_landing_state rows deleted afterward could still be associated with stale master metadata. The sibling removeExcludedCrrMetadata function checks hasMasterRows independently — the same independent pre-check would make this function fully symmetric with it. Very unlikely in practice, but worth aligning for correctness.

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/desktop/src/main/services/state/kvDb.ts
Line: 524-551

Comment:
**`crsql_master`/`crsql_changes` cleanup gated behind shadow-table presence**

`deleteAllRowsWithoutCrrReplication` enters the CRR-cleanup branch only when a shadow table (`__crsql_clock` / `__crsql_pks`) or a CRR trigger is found. If a previous partial initialization left entries only in `crsql_master` (no shadow tables, no triggers), those entries survive the wipe and `queue_landing_state` rows deleted afterward could still be associated with stale master metadata. The sibling `removeExcludedCrrMetadata` function checks `hasMasterRows` independently — the same independent pre-check would make this function fully symmetric with it. Very unlikely in practice, but worth aligning for correctness.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Codex Fix in Claude Code

}

const QUEUE_OVERHAUL_WIPE_MARKER = "queue_landing_state.wiped_for_stacked_overhaul.v1";

function rawCrsqlPrimaryKeyMatchesText(value: unknown, text: string): boolean {
if (value === text) return true;
if (!(value instanceof Uint8Array)) return false;

const packed = packedCrsqlPrimaryKey(text);
return isSyncScalarBytes(packed)
&& Buffer.from(value).equals(Buffer.from(packed.base64, "base64"));
}

function syncScalarPrimaryKeyMatchesText(value: SyncScalar, text: string): boolean {
if (value === text) return true;

const packed = packedCrsqlPrimaryKey(text);
return isSyncScalarBytes(value)
&& isSyncScalarBytes(packed)
&& value.base64 === packed.base64;
}

function isLocalOnlyQueueWipeMarkerRawChange(change: { table_name: string; pk: unknown }): boolean {
return change.table_name === "kv"
&& rawCrsqlPrimaryKeyMatchesText(change.pk, QUEUE_OVERHAUL_WIPE_MARKER);
}

function isLocalOnlyQueueWipeMarkerChange(change: CrsqlChangeRow): boolean {
return change.table === "kv"
&& syncScalarPrimaryKeyMatchesText(change.pk, QUEUE_OVERHAUL_WIPE_MARKER);
}

/**
* One-shot local wipe of legacy queue_landing_state on upgrade to the stacked-PR
* queue overhaul. Must run after migrations and before ensureCrrTables so queue
* deletes do not replicate to peers that have not upgraded yet. The marker is
* stored in kv for compatibility with the original migration, but filtered from
* CRDT import/export because it records local upgrade work, not shared state.
*/
function wipeQueueLandingStateForStackedOverhaulIfNeeded(db: DatabaseSyncType, logger?: Logger): void {
try {
const row = getRow<{ value: string }>(
db,
"select value from kv where key = ?",
[QUEUE_OVERHAUL_WIPE_MARKER],
);
if (row) return;

deleteAllRowsWithoutCrrReplication(db, "queue_landing_state", logger);
runStatement(
db,
"insert into kv (key, value) values (?, ?) on conflict(key) do update set value = excluded.value",
[QUEUE_OVERHAUL_WIPE_MARKER, new Date().toISOString()],
);
} catch {
// Table may not exist on a brand-new DB; initialization will create both
// tables and the next startup will record the marker. Skipping the wipe
// on a fresh DB is correct (nothing to wipe).
}
}

function removeExcludedCrrMetadata(db: DatabaseSyncType, logger?: Logger): void {
for (const tableName of LOCAL_ONLY_CRR_EXCLUDED_TABLES) {
const clockTableName = `${tableName}__crsql_clock`;
Expand Down Expand Up @@ -1821,32 +1910,6 @@ function migrate(db: MigrationDb) {
try { db.run("alter table queue_landing_state add column wait_reason text"); } catch {}
try { db.run("alter table queue_landing_state add column updated_at text"); } catch {}

// One-shot wipe of legacy queue_landing_state on upgrade to the stacked-PR
// queue overhaul. The new queue creates PRs with chain bases (PR_N's base =
// previous lane's branch) instead of all-into-main, so any in-flight queue
// from the old code path would be misinterpreted by the new landing loop.
// Wiping rather than migrating is a deliberate choice — the user accepts
// losing in-flight queues in exchange for not maintaining a translation
// layer for every legacy field shape.
const QUEUE_OVERHAUL_WIPE_MARKER = "queue_landing_state.wiped_for_stacked_overhaul.v1";
try {
const row = db.get<{ value: string }>(
"select value from kv where key = ?",
[QUEUE_OVERHAUL_WIPE_MARKER],
);
if (!row) {
db.run("delete from queue_landing_state");
db.run(
"insert into kv (key, value) values (?, ?) on conflict(key) do update set value = excluded.value",
[QUEUE_OVERHAUL_WIPE_MARKER, new Date().toISOString()],
);
}
} catch {
// Table may not exist on a brand-new DB; initialization will create both
// tables and the next startup will record the marker. Skipping the wipe
// on a fresh DB is correct (nothing to wipe).
}

// Rebase dismiss/defer persistence
db.run(`
create table if not exists rebase_dismissed (
Expand Down Expand Up @@ -2849,6 +2912,8 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
removeExcludedCrrMetadata(db, logger);
}

wipeQueueLandingStateForStackedOverhaulIfNeeded(db, logger);

if (crsqliteLoaded) {
loadCrsqliteIfAvailable();
ensureCrrTables(db, logger);
Expand Down Expand Up @@ -2951,17 +3016,19 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
[version]
);

return rows.map((row) => ({
table: row.table_name,
pk: encodeSyncScalar(row.pk),
cid: row.cid,
val: encodeSyncScalar(row.val),
col_version: Number(row.col_version),
db_version: Number(row.db_version),
site_id: Buffer.from(row.site_id).toString("hex"),
cl: Number(row.cl),
seq: Number(row.seq),
}));
return rows
.filter((row) => !isLocalOnlyQueueWipeMarkerRawChange(row))
.map((row) => ({
table: row.table_name,
pk: encodeSyncScalar(row.pk),
cid: row.cid,
val: encodeSyncScalar(row.val),
col_version: Number(row.col_version),
db_version: Number(row.db_version),
site_id: Buffer.from(row.site_id).toString("hex"),
cl: Number(row.cl),
seq: Number(row.seq),
}));
},
applyChanges: (changes: CrsqlChangeRow[]) => {
if (!crsqliteLoaded) return { appliedCount: 0, dbVersion: 0, touchedTables: [], rebuiltFts: false };
Expand All @@ -2970,6 +3037,7 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
runStatement(db, "begin");
try {
for (const rawChange of changes) {
if (isLocalOnlyQueueWipeMarkerChange(rawChange)) continue;
// Skip changes for tables that no longer exist in the schema
// (e.g. unified_memories removed in #329).
if (!rawHasTable(db, rawChange.table)) continue;
Expand Down
68 changes: 68 additions & 0 deletions apps/desktop/src/renderer/components/files/FilesPage.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,74 @@ describe("FilesPage", () => {
}
});

it("preserves oversized dirty tabs when switching lane scopes", async () => {
const laneA = "lane-large-a";
const laneB = "lane-large-b";
useAppStore.setState({
selectedLaneId: laneA,
lanes: [
{ id: laneA, name: "Large A", branchRef: "refs/heads/large-a" },
{ id: laneB, name: "Large B", branchRef: "refs/heads/large-b" },
] as any,
});
vi.mocked(window.ade.files.listWorkspaces).mockResolvedValue([
{
id: "primary",
kind: "primary",
laneId: null,
name: "ADE",
branchRef: "refs/heads/main",
rootPath: projectRoot,
isReadOnlyByDefault: false,
},
{
id: "lane-large-a-ws",
kind: "worktree",
laneId: laneA,
name: "Large A",
branchRef: "refs/heads/large-a",
rootPath: `${projectRoot}/.ade/worktrees/large-a`,
isReadOnlyByDefault: false,
},
{
id: "lane-large-b-ws",
kind: "worktree",
laneId: laneB,
name: "Large B",
branchRef: "refs/heads/large-b",
rootPath: `${projectRoot}/.ade/worktrees/large-b`,
isReadOnlyByDefault: false,
},
]);

renderFilesPage({
openFilePath: "src/index.ts",
});

await waitForEditorText("value = 1");

const oversizedDirtyContent = `dirty-start\n${"x".repeat(8 * 1024 * 1024 + 1)}\ndirty-end`;
act(() => {
latestMockEditor?.setValue(oversizedDirtyContent);
});
expect(latestMockEditor?.getValue().length).toBe(oversizedDirtyContent.length);

act(() => {
useAppStore.setState({ selectedLaneId: laneB });
});
await waitFor(() => {
expect(screen.getByText(/OPEN A FILE TO START EDITING/i)).toBeTruthy();
});

act(() => {
useAppStore.setState({ selectedLaneId: laneA });
});
await waitFor(() => {
expect(latestMockEditor?.getValue().length).toBe(oversizedDirtyContent.length);
expect(latestMockEditor?.getValue().endsWith("dirty-end")).toBe(true);
});
});

it("treats Windows workspace paths case-insensitively for open tabs and watcher events", async () => {
projectRoot = "C:/Repo";
resetStore();
Expand Down
5 changes: 3 additions & 2 deletions apps/desktop/src/renderer/components/files/FilesPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ const filesRootTreeCacheByWorkspace = new Map<string, FileTreeNode[]>();
const MAX_FILES_PAGE_CACHED_SCOPES = 8;
const MAX_FILES_TREE_CACHED_WORKSPACES = 32;
const MAX_CACHED_CLEAN_TAB_CHARS = 256 * 1024;
const MAX_CACHED_DIRTY_TAB_CHARS = 8 * 1024 * 1024;
const MAX_QUEUED_TREE_PARENT_REFRESHES = 24;
const FILES_WATCH_START_DELAY_MS = import.meta.env.MODE === "test" || (window as any).__adeBrowserMock ? 0 : 2_000;

Expand Down Expand Up @@ -249,8 +248,10 @@ function filesPageSessionHasUnsavedTabs(session: FilesPageSessionState | undefin
function cacheableSessionTabs(openTabs: OpenTab[]): OpenTab[] {
return openTabs
.filter((tab) => {
// Always retain unsaved buffers across session scope changes; size limits
// apply only to clean tabs so large in-flight edits are not dropped silently.
if (tab.content !== tab.savedContent) {
return tab.content.length <= MAX_CACHED_DIRTY_TAB_CHARS;
return true;
}
return isTextTab(tab) && tab.content.length <= MAX_CACHED_CLEAN_TAB_CHARS;
})
Expand Down
2 changes: 0 additions & 2 deletions apps/ios/ADE/Resources/DatabaseBootstrap.sql
Original file line number Diff line number Diff line change
Expand Up @@ -817,8 +817,6 @@ alter table queue_landing_state add column wait_reason text;

alter table queue_landing_state add column updated_at text;

delete from queue_landing_state;

create table if not exists rebase_dismissed (
lane_id text not null,
project_id text not null,
Expand Down
Loading
Loading