diff --git a/.rwx/integration.yml b/.rwx/integration.yml index aa328259d..f9f8a2530 100644 --- a/.rwx/integration.yml +++ b/.rwx/integration.yml @@ -103,6 +103,11 @@ tasks: python3-venv \ python3-virtualenv \ ruby-full + sudo install -d /usr/share/keyrings + curl -fsSL https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo gpg --dearmor -o /usr/share/keyrings/postgresql.gpg + echo "deb [signed-by=/usr/share/keyrings/postgresql.gpg] https://apt.postgresql.org/pub/repos/apt $(. /etc/os-release && echo "${VERSION_CODENAME}")-pgdg main" | sudo tee /etc/apt/sources.list.d/pgdg.list >/dev/null + sudo apt-get update + sudo apt-get install -y postgresql-client-18 sudo apt-get remove -y cmake || true sudo pip3 install --break-system-packages cmake==3.31.6 cmake --version @@ -430,6 +435,33 @@ tasks: - key: lcov-copy-data path: copy-data.lcov + - key: integration-resharding + use: integration-build-pgdog-cov + docker: true + timeout: 20m + agent: + cpus: 4 + memory: 16gb + run: | + export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/resharding-%p-%m.profraw" + + cleanup() { + (cd integration/resharding && docker-compose down >/dev/null 2>&1 || true) + killall -TERM pgdog 2>/dev/null || true + sleep 1 + killall -KILL pgdog 2>/dev/null || true + } + trap cleanup EXIT + + timeout --signal=TERM --kill-after=90s 16m bash integration/resharding/dev.sh + + cargo llvm-cov report --release --package pgdog --lcov --output-path resharding.lcov + outputs: + filesystem: false + artifacts: + - key: lcov-resharding + path: resharding.lcov + - key: integration-python use: integration-build-pgdog-cov background-processes: *postgres-bg–processes @@ -534,6 +566,7 @@ tasks: -a "$LCOV_TOXI" \ -a "$LCOV_RUST" \ -a "$LCOV_COPY_DATA" \ + -a "$LCOV_RESHARDING" \ -a "$LCOV_PYTHON" \ -a "$LCOV_LOAD_BALANCER" \ -a "$LCOV_COMPLEX" \ @@ -551,6 +584,7 @@ tasks: LCOV_TOXI: ${{ tasks.integration-toxi.artifacts.lcov-toxi }} LCOV_RUST: ${{ tasks.integration-rust.artifacts.lcov-rust }} LCOV_COPY_DATA: ${{ tasks.integration-copy-data.artifacts.lcov-copy-data }} + LCOV_RESHARDING: ${{ tasks.integration-resharding.artifacts.lcov-resharding }} LCOV_PYTHON: ${{ tasks.integration-python.artifacts.lcov-python }} LCOV_LOAD_BALANCER: ${{ tasks.integration-load-balancer.artifacts.lcov-load-balancer }} LCOV_COMPLEX: ${{ tasks.integration-complex.artifacts.lcov-complex }} diff --git a/integration/resharding/dev.sh b/integration/resharding/dev.sh new file mode 100644 index 000000000..2526473d0 --- /dev/null +++ b/integration/resharding/dev.sh @@ -0,0 +1,105 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +DEFAULT_BIN="${SCRIPT_DIR}/../../target/debug/pgdog" +PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN} + +pushd ${SCRIPT_DIR} +docker-compose down && docker-compose up -d + +# Give it a second to boot up. +# It restarts the DB during initialization. +sleep 2 + +for port in 15432 15433 15434 15435; do + echo "Waiting for database on port ${port}..." + until PGPASSWORD=pgdog pg_isready -h 127.0.0.1 -p "${port}" -U pgdog -d postgres; do + sleep 1 + done +done + +${PGDOG_BIN} & +PGDOG_PID="$!" + +export PGPASSWORD=pgdog +export PGHOST=127.0.0.1 +export PGPORT=6432 +export PGUSER=pgdog + +until psql source -c 'SELECT 1' 2> /dev/null; do + sleep 1 +done + +pgbench -f pgbench.sql -P 1 source -c 5 -t 1000000 & +PGBENCH_PID="$!" + +sleep 10 + +psql admin -c 'COPY_DATA source destination pgdog' + +sleep 10 + +kill -TERM ${PGBENCH_PID} + +replace_copy_with_replicate() { + local table="$1" + local column="$2" + + psql source -c "UPDATE ${table} SET ${column} = regexp_replace(${column}, '-copy$', '-replicate') WHERE ${column} LIKE '%-copy';" +} + +replace_copy_with_replicate tenants name +replace_copy_with_replicate accounts full_name +replace_copy_with_replicate projects name +replace_copy_with_replicate tasks title +replace_copy_with_replicate task_comments body +replace_copy_with_replicate settings name + +wait_for_no_copy_rows() { + local table="$1" + local column="$2" + + while true; do + count=$(psql -d destination -tAc "SELECT COUNT(*) FROM ${table} WHERE ${column} LIKE '%-copy'") + if [ "${count}" -eq 0 ]; then + echo "${table}.${column}: replication caught up" + break + fi + + echo "${table}.${column}: waiting for ${count} rows ending in -copy" + sleep 1 + done +} + +wait_for_no_copy_rows tenants name +wait_for_no_copy_rows accounts full_name +wait_for_no_copy_rows projects name +wait_for_no_copy_rows tasks title +wait_for_no_copy_rows task_comments body +wait_for_no_copy_rows settings name + +check_row_count_matches() { + local table="$1" + local source_count + local destination_count + + source_count=$(psql -d source -tAc "SELECT COUNT(*) FROM ${table}") + destination_count=$(psql -d destination -tAc "SELECT COUNT(*) FROM ${table}") + + if [ "${source_count}" -ne "${destination_count}" ]; then + echo "MISMATCH ${table}: source=${source_count} destination=${destination_count}" + exit 1 + fi + + echo "OK ${table}: ${source_count} rows" +} + +check_row_count_matches tenants +check_row_count_matches accounts +check_row_count_matches projects +check_row_count_matches tasks +check_row_count_matches task_comments +check_row_count_matches settings + +kill -TERM ${PGDOG_PID} +docker-compose down diff --git a/integration/resharding/docker-compose.yaml b/integration/resharding/docker-compose.yaml new file mode 100644 index 000000000..b89eef8c0 --- /dev/null +++ b/integration/resharding/docker-compose.yaml @@ -0,0 +1,55 @@ +services: + source_0: + image: postgres:18 + command: postgres -c wal_level=logical + environment: + POSTGRES_USER: pgdog + POSTGRES_PASSWORD: pgdog + POSTGRES_DB: postgres + volumes: + - ./schema.sql:/docker-entrypoint-initdb.d/schema.sql + ports: + - 15432:5432 + networks: + - postgres + + source_1: + image: postgres:18 + command: postgres -c wal_level=logical + environment: + POSTGRES_USER: pgdog + POSTGRES_PASSWORD: pgdog + POSTGRES_DB: postgres + volumes: + - ./schema.sql:/docker-entrypoint-initdb.d/schema.sql + ports: + - 15433:5432 + networks: + - postgres + + destination_0: + image: postgres:18 + command: postgres -c wal_level=logical + environment: + POSTGRES_USER: pgdog + POSTGRES_PASSWORD: pgdog + POSTGRES_DB: postgres + ports: + - 15434:5432 + networks: + - postgres + + destination_1: + image: postgres:18 + command: postgres -c wal_level=logical + environment: + POSTGRES_USER: pgdog + POSTGRES_PASSWORD: pgdog + POSTGRES_DB: postgres + ports: + - 15435:5432 + networks: + - postgres + +networks: + postgres: diff --git a/integration/resharding/pgbench.sql b/integration/resharding/pgbench.sql new file mode 100644 index 000000000..c1b066756 --- /dev/null +++ b/integration/resharding/pgbench.sql @@ -0,0 +1,81 @@ +\set id_seed random(1, 1000000000) + +INSERT INTO tenants (id, name, billing_email) +VALUES ( + :id_seed, + 'tenant-' || :id_seed || '-copy', + 'tenant-' || :id_seed || '-copy@example.test' +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO accounts (id, tenant_id, email, full_name) +VALUES ( + :id_seed, + :id_seed, + 'account-' || :id_seed || '-copy@example.test', + 'account-' || :id_seed || '-copy' +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO projects (id, tenant_id, owner_account_id, name, status) +VALUES ( + :id_seed, + :id_seed, + :id_seed, + 'project-' || :id_seed || '-copy', + 'active' +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO tasks (id, tenant_id, project_id, assignee_account_id, title, state) +VALUES ( + :id_seed, + :id_seed, + :id_seed, + :id_seed, + 'task-' || :id_seed || '-copy', + 'open' +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO task_comments (id, tenant_id, task_id, author_account_id, body) +VALUES ( + :id_seed, + :id_seed, + :id_seed, + :id_seed, + 'comment-' || :id_seed || '-copy' +) +ON CONFLICT (id) DO NOTHING; + +INSERT INTO settings (id, name, value) +VALUES ( + :id_seed, + 'setting-' || :id_seed || '-copy', + 'value-' || :id_seed || '-copy' +) +ON CONFLICT (id) DO NOTHING; + +UPDATE tenants +SET name = 'tenant-' || :id_seed || '-replicate' +WHERE id = :id_seed; + +UPDATE accounts +SET full_name = 'account-' || :id_seed || '-replicate' +WHERE id = :id_seed; + +UPDATE projects +SET name = 'project-' || :id_seed || '-replicate' +WHERE id = :id_seed; + +UPDATE tasks +SET title = 'task-' || :id_seed || '-replicate' +WHERE id = :id_seed; + +UPDATE task_comments +SET body = 'comment-' || :id_seed || '-replicate' +WHERE id = :id_seed; + +UPDATE settings +SET name = 'setting-' || :id_seed || '-replicate' +WHERE id = :id_seed; diff --git a/integration/resharding/pgdog.toml b/integration/resharding/pgdog.toml new file mode 100644 index 000000000..2d627feb6 --- /dev/null +++ b/integration/resharding/pgdog.toml @@ -0,0 +1,56 @@ +[[databases]] +name = "source" +host = "127.0.0.1" +port = 15432 +shard = 0 +database_name = "postgres" + +[[databases]] +name = "source" +host = "127.0.0.1" +port = 15433 +shard = 1 +database_name = "postgres" + +[[databases]] +name = "destination" +host = "127.0.0.1" +port = 15434 +shard = 0 +database_name = "postgres" + +[[databases]] +name = "destination" +host = "127.0.0.1" +port = 15435 +shard = 1 +database_name = "postgres" + +[[sharded_tables]] +database = "source" +name = "tenants" +column = "id" +data_type = "bigint" + +[[sharded_tables]] +database = "source" +column = "tenant_id" +data_type = "bigint" + +[[sharded_tables]] +database = "destination" +name = "tenants" +column = "id" +data_type = "bigint" + +[[sharded_tables]] +database = "destination" +column = "tenant_id" +data_type = "bigint" + +[admin] +password = "pgdog" +user = "pgdog" + +[replication] +pg_dump_path = "/usr/lib/postgresql/18/bin/pg_dump" diff --git a/integration/resharding/schema.sql b/integration/resharding/schema.sql new file mode 100644 index 000000000..003c3f482 --- /dev/null +++ b/integration/resharding/schema.sql @@ -0,0 +1,74 @@ +CREATE TABLE tenants ( + id BIGSERIAL PRIMARY KEY, + name TEXT NOT NULL, + billing_email TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE accounts ( + id BIGSERIAL PRIMARY KEY, + tenant_id BIGINT NOT NULL, + email TEXT NOT NULL, + full_name TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (tenant_id, id), + UNIQUE (tenant_id, email), + FOREIGN KEY (tenant_id) REFERENCES tenants (id) +); + +CREATE TABLE projects ( + id BIGSERIAL PRIMARY KEY, + tenant_id BIGINT NOT NULL, + owner_account_id BIGINT NOT NULL, + name TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'active', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (tenant_id, id), + FOREIGN KEY (tenant_id) REFERENCES tenants (id), + FOREIGN KEY (tenant_id, owner_account_id) REFERENCES accounts (tenant_id, id) +); + +CREATE TABLE tasks ( + id BIGSERIAL PRIMARY KEY, + tenant_id BIGINT NOT NULL, + project_id BIGINT NOT NULL, + assignee_account_id BIGINT, + title TEXT NOT NULL, + state TEXT NOT NULL DEFAULT 'open', + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (tenant_id, id), + FOREIGN KEY (tenant_id) REFERENCES tenants (id), + FOREIGN KEY (tenant_id, project_id) REFERENCES projects (tenant_id, id), + FOREIGN KEY (tenant_id, assignee_account_id) REFERENCES accounts (tenant_id, id) +); + +CREATE TABLE task_comments ( + id BIGSERIAL PRIMARY KEY, + tenant_id BIGINT NOT NULL, + task_id BIGINT NOT NULL, + author_account_id BIGINT NOT NULL, + body TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (tenant_id, id), + FOREIGN KEY (tenant_id) REFERENCES tenants (id), + FOREIGN KEY (tenant_id, task_id) REFERENCES tasks (tenant_id, id), + FOREIGN KEY (tenant_id, author_account_id) REFERENCES accounts (tenant_id, id) +); + +CREATE TABLE settings ( + id BIGSERIAL PRIMARY KEY, + name VARCHAR NOT NULL, + value VARCHAR NOT NULL, + created_at timestamptz NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_accounts_tenant_id ON accounts (tenant_id); +CREATE INDEX idx_projects_tenant_id ON projects (tenant_id); +CREATE INDEX idx_projects_owner ON projects (tenant_id, owner_account_id); +CREATE INDEX idx_tasks_tenant_id ON tasks (tenant_id); +CREATE INDEX idx_tasks_project ON tasks (tenant_id, project_id); +CREATE INDEX idx_tasks_assignee ON tasks (tenant_id, assignee_account_id); +CREATE INDEX idx_task_comments_tenant_id ON task_comments (tenant_id); +CREATE INDEX idx_task_comments_task ON task_comments (tenant_id, task_id); + +CREATE PUBLICATION pgdog FOR ALL TABLES; diff --git a/integration/resharding/users.toml b/integration/resharding/users.toml new file mode 100644 index 000000000..67142d309 --- /dev/null +++ b/integration/resharding/users.toml @@ -0,0 +1,11 @@ +[[users]] +database = "source" +name = "pgdog" +password = "pgdog" +schema_admin = true + +[[users]] +database = "destination" +name = "pgdog" +password = "pgdog" +schema_admin = true diff --git a/pgdog/src/admin/copy_data.rs b/pgdog/src/admin/copy_data.rs index 129a3f9fe..069007166 100644 --- a/pgdog/src/admin/copy_data.rs +++ b/pgdog/src/admin/copy_data.rs @@ -59,6 +59,7 @@ impl Command for CopyData { let task_id = Task::register(TaskType::CopyData(spawn(async move { orchestrator.load_schema().await?; + orchestrator.schema_sync_pre(true).await?; orchestrator.data_sync().await?; AsyncTasks::insert(TaskType::Replication(Box::new( orchestrator.replicate().await?,