This project demonstrates a real-time Change Data Capture pipeline. It captures data changes (Inserts, Updates, Deletes) from a MySQL source and streams them into PostgreSQL using Apache Kafka, Debezium, and Avro serialization.
Before running the project, ensure you have the following installed:
| Tool | Versions | Purpose |
|---|---|---|
| Java | 11+ | Runtime for Kafka and Connectors |
| Python | 3.9+ | Running FastAPI |
| Kafka | 4.0.0+ | Distributed Event Streaming |
| MySQL | 8.0+ | Source Database |
| PostgreSQL | 14.0+ | Target Database |
| Schema Registry | Latest | Managing Avro Schemas |
| Uv | Latest | Python Package Management |
| cURL | Latest | Command-line tool used to transfer data |
1.Clone and Initialize the project
git clone https://github.com/zablon-oigo/change-data-capture-mysql-postgres.git
cd change-data-capture-mysql-postgres
# Initialize Python environment
uv sync
source .venv/bin/activate2.Create a kafka topic
bin/kafka-topics.sh --create --topic data \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 11.MySQL configuration
Debezium requires the MySQL binary log to be enabled. Add these lines to your mysqld.cnf file.
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL2.Restart MySQL
sudo systemctl restart mysql
sudo systemctl status mysql3.Create a new user
CREATE USER 'debezium'@'localhost' IDENTIFIED BY 'Pass123@';4.Assign user privileges
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'localhost';5.Initialize Databases
-- MySQL
CREATE DATABASE lib;
-- PostgreSQL
CREATE DATABASE lib;1.Kafka Connect & Plugins
Install the required connectors into your Kafka directory:
# Define your Kafka path
export KAFKA_HOME=/opt/kafka
# Download & extract connectors
# Move the extracted folders to your plugin path
sudo mv debezium-connector-mysql $KAFKA_HOME/plugins/
sudo mv kafka-connect-jdbc $KAFKA_HOME/libs/2.Update properties
Edit $KAFKA_HOME/config/connect-distributed.properties to include your plugin path.
plugin.path=/opt/kafka/libs,/opt/kafka/plugins4.Start kafka connect
Open a separate terminal tab.
bin/connect-distributed.sh config/connect-distributed.propertiesUse curl to register your source and sink.
1.Register MySQL Source:
curl -X POST -H "Content-Type: application/json" --data @connectors/mysql-source-connector.json http://localhost:8083/connectors2.Register PostgreSQL Sink:
curl -X POST -H "Content-Type: application/json" --data @connectors/postgres-sink-connector.json http://localhost:8083/connectors1.List connectors
curl -s http://localhost:8083/connectors | jq2.Check Connector Status
curl -s http://localhost:8083/connectors?expand=status | jq1.Run FastAPI in development mode
fastapi dev 2.Check health
curl http://localhost:80003.Celery Worker
In a different tab start celery
celery -A src.celery.c_app --loglevel=INFO1.Register a user
curl -X POST http://localhost:8000/api/v1/auth/signup \
-H "Content-Type: application/json" \
-d '{"email":"test@mail.com", "password":"Pass!@#", "username":"testuser"}'2.Verify & Login
Check your mail for the token, then:
curl -X POST http://localhost:8000/api/v1/auth/verify/{token}
curl -X POST http://localhost:8000/api/v1/auth/login -d '{"username":"testuser","password":"Pass!@#"}'Perform CRUD operations on books to see CDC in action.
1.Create a Book (INSERT)
This command adds a record to your MySQL database. Debezium will capture this as a "Create" event.
curl -X 'POST' \
'http://localhost:8000/api/v1/books/' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"title": "Designing Data-Intensive Applications",
"author": "Martin Kleppmann",
"publisher": "O-Reilly",
"published_date": "2017-03-16",
"page_count": 616,
"language": "English"
}'2.Update a Book (PATCH)
Updating a specific field triggers an "Update" event. You will see both the before and after states in the Avro message in Kafka.
Note: Replace {book_uid} with the UID returned from the POST request.
curl -X 'PATCH' \
'http://localhost:8000/api/v1/books/{book_uid}' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"title": "Designing Data-Intensive Applications - 2nd Edition",
"page_count": 650
}'3.Delete a Book (DELETE)
Deleting a record generates a "Delete" event and a "Tombstone" message in Kafka to ensure the record is also removed from the PostgreSQL sink.
Note: Replace {book_uid} with the UID returned from the POST request.
curl -X 'DELETE' \
'http://localhost:8000/api/v1/books/{book_uid}' \
-H 'accept: application/json'1.Check PostgreSQL (The Sink):
After running any of the commands above, check your target database to confirm the synchronization:
# Connect to your Postgres instance
psql -d lib -U postgres
SELECT * FROM books;2.Observe Avro Messages
Since the data is serialized in Avro, use the specific console consumer to read data.
bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 \
--property map.deep.nulls=true \
--property schema.registry.url=http://localhost:8081 \
--topic data.lib.users --from-beginningChange Data Capture (CDC): Streaming MySQL Changes to PostgreSQL in Real Time.