Change data capture (CDC) is a data integration pattern to track changes in a database so that actions can be taken using the changed data. Debezium is probably the most popular open source platform for CDC. Originally providing Kafka source connectors, it also supports a ready-to-use application called Debezium server. The standalone application can be used to stream change events to other messaging infrastructure such as Google Cloud Pub/Sub, Amazon Kinesis and Apache Pulsar. In this post, we develop a CDC solution locally using Docker. The source of the theLook eCommerce is modified to generate data continuously, and the data is inserted into multiple tables of a PostgreSQL database. Among those tables, two of them are tracked by the Debezium server, and it pushes row-level changes of those tables into Pub/Sub topics on the Pub/Sub emulator. Finally, messages of the topics are read by a Python application.

Docker Compose Services

We have three docker-compose services, and each service is illustrated separately. The source of this post can be found in this GitHub repository.

PostgreSQL

A PostgreSQL database server is configured so that it enables logical replication (wal_level=logical) at startup. It is necessary because we will be using the standard logical decoding output plug-in in PostgreSQL 10+ - see this page for details.

 1# docker-compose.yml
 2version: "3"
 3services:
 4  postgres:
 5    image: postgres:16
 6    container_name: postgres
 7    command: ["postgres", "-c", "wal_level=logical"]
 8    ports:
 9      - 5432:5432
10    volumes:
11      - ./config/postgres:/docker-entrypoint-initdb.d
12      - postgres_data:/var/lib/postgresql/data
13    environment:
14      POSTGRES_DB: develop
15      POSTGRES_USER: develop
16      POSTGRES_PASSWORD: password
17      PGUSER: develop
18      TZ: Australia/Sydney
19...
20volumes:
21  postgres_data:
22    driver: local
23    name: postgres_data

The bootstrap script creates a dedicated schema named ecommerce and sets the schema as the default search path. It ends up creating a custom publication for all tables in the ecommerce schema.

 1-- config/postgres/bootstrap.sql
 2CREATE SCHEMA ecommerce;
 3GRANT ALL ON SCHEMA ecommerce TO develop;
 4
 5-- change search_path on a connection-level
 6SET search_path TO ecommerce;
 7
 8-- change search_path on a database-level
 9ALTER database "develop" SET search_path TO ecommerce;
10
11-- create a publication for all tables in the ecommerce schema
12CREATE PUBLICATION cdc_publication FOR TABLES IN SCHEMA ecommerce;

Debezium Server

The Debezium server configuration is fairly simple, and the application properties are supplied by volume mapping.

 1# docker-compose.yml
 2version: "3"
 3services:
 4...
 5  debezium:
 6    image: debezium/server:3.0.0.Final
 7    container_name: debezium
 8    volumes:
 9      - ./config/debezium:/debezium/config
10    depends_on:
11      - postgres
12    restart: always
13...

The application properties can be grouped into four sections.

  • Sink configuration
    • The sink type is set up as pubsub, and the address of the Pub/Sub emulator is specified as an extra.
  • Source configuration
    • The PostgreSQL source connector class is specified, followed by adding the database details.
    • Only two tables in the ecommerce schema are included, and the output plugin and replication names are configuring as required.
    • Note that the topic prefix (debezium.source.topic.prefix) is mandatory, and the Debezium server expects the corresponding Pub/Sub topic exists where its name is in the <prefix>.<schema>.<table-name> format. Therefore, we need to create topics for the two tables before we start the server (if there are records already) or before we send records to the database (if there is no record).
  • Message transform
    • The change messages are simplified using a single message transform. With this transform, the message includes only a single payload that keeps record attributes and additional message metadata as specified in debezium.transforms.unwrap.add.fields.
  • Log configuration
    • The default log format is JSON, and it is disabled to produce log messages as console outputs for ease of troubleshooting.
 1# config/debezium/application.properties
 2## sink config
 3debezium.sink.type=pubsub
 4debezium.sink.pubsub.project.id=test-project
 5debezium.sink.pubsub.ordering.enabled=true
 6debezium.sink.pubsub.address=pubsub:8085
 7## source config
 8debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
 9debezium.source.offset.storage.file.filename=data/offsets.dat
10debezium.source.offset.flush.interval.ms=5000
11debezium.source.database.hostname=postgres
12debezium.source.database.port=5432
13debezium.source.database.user=develop
14debezium.source.database.password=password
15debezium.source.database.dbname=develop
16# topic.prefix is mandatory, topic name should be <prefix>.<schema>.<table-name>
17debezium.source.topic.prefix=demo
18debezium.source.table.include.list=ecommerce.orders,ecommerce.order_items
19debezium.source.plugin.name=pgoutput
20debezium.source.publication.name=cdc_publication
21debezium.source.tombstones.on.delete=false
22## SMT - unwrap
23debezium.transforms=unwrap
24debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
25debezium.transforms.unwrap.drop.tombstones=false
26debezium.transforms.unwrap.delete.handling.mode=rewrite
27debezium.transforms.unwrap.add.fields=op,db,table,schema,lsn,source.ts_ms
28## log config
29quarkus.log.console.json=false
30quarkus.log.file.enable=false
31quarkus.log.level=INFO

Below shows an output payload of one of the tables. Those that are prefixed by double underscore (__) are message metadata.

 1{
 2  "order_id": "5db78495-2d65-4ebf-871f-cdc66eb1ed61",
 3  "user_id": "3e3ccd36-401b-4ea2-bd4b-eab63fcd1c5f",
 4  "status": "Shipped",
 5  "gender": "M",
 6  "created_at": 1724210040000000,
 7  "returned_at": null,
 8  "shipped_at": 1724446140000000,
 9  "delivered_at": null,
10  "num_of_item": 2,
11  "__deleted": "false",
12  "__op": "r",
13  "__db": "develop",
14  "__table": "orders",
15  "__schema": "ecommerce",
16  "__lsn": 32919936,
17  "__source_ts_ms": 1730183224763
18}

Pub/Sub Emulator

The Pub/Sub emulator is started as a gcloud component on port 8085.

 1# docker-compose.yml
 2version: "3"
 3services:
 4...
 5  pubsub:
 6    image: google/cloud-sdk:497.0.0-emulators
 7    container_name: pubsub
 8    command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
 9    ports:
10      - "8085:8085"
11...

Solution Deployment

We can start the docker-compose services by docker-compose up -d. Then, we should create the topics into which the change messages are ingested. The topics and subscriptions to them can be created by the following Python script.

 1# ps_setup.py
 2import os
 3import argparse
 4
 5from src import ps_utils
 6
 7
 8if __name__ == "__main__":
 9    parser = argparse.ArgumentParser(description="Create PubSub resources")
10    parser.add_argument(
11        "--emulator-host",
12        "-e",
13        default="localhost:8085",
14        help="PubSub emulator host address",
15    )
16    parser.add_argument(
17        "--project-id", "-p", default="test-project", help="GCP project id"
18    )
19    parser.add_argument(
20        "--topics",
21        "-t",
22        action="append",
23        default=["demo.ecommerce.orders", "demo.ecommerce.order_items"],
24        help="PubSub topic names",
25    )
26    args = parser.parse_args()
27    os.environ["PUBSUB_EMULATOR_HOST"] = args.emulator_host
28    os.environ["PUBSUB_PROJECT_ID"] = args.project_id
29
30    for name in set(args.topics):
31        ps_utils.create_topic(project_id=args.project_id, topic_name=name)
32        ps_utils.create_subscription(
33            project_id=args.project_id, sub_name=f"{name}.sub", topic_name=name
34        )
35    ps_utils.show_resources("topics", args.emulator_host, args.project_id)
36    ps_utils.show_resources("subscriptions", args.emulator_host, args.project_id)

Once executed, the topics and subscriptions are created as shown below.

1python ps_setup.py 
2projects/test-project/topics/demo.ecommerce.order_items
3projects/test-project/topics/demo.ecommerce.orders
4projects/test-project/subscriptions/demo.ecommerce.order_items.sub
5projects/test-project/subscriptions/demo.ecommerce.orders.sub

Data Generator

The theLook eCommerce dataset has seven entities and five of them are generated dynamically. In each iteration, a user record is created, and it has zero or more orders. An order record creates zero or more order items in turn. Finally, an order item record creates zero or more event and inventory item objects. Once all records are generated, they are ingested into the relevant tables using pandas’ to_sql method.

  1# data_gen.py
  2import argparse
  3import time
  4import logging
  5
  6import pandas as pd
  7
  8from src.models import User
  9from src.utils import create_connection, insert_to_db, Connection, generate_from_csv
 10
 11extraneous_headers = [
 12    "event_type",
 13    "ip_address",
 14    "browser",
 15    "traffic_source",
 16    "session_id",
 17    "sequence_number",
 18    "uri",
 19    "is_sold",
 20]
 21
 22
 23def write_dynamic_data(
 24    conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
 25):
 26    tbl_map = {
 27        "users": [],
 28        "orders": [],
 29        "order_items": [],
 30        "inventory_items": [],
 31        "events": [],
 32    }
 33    user = User()
 34    logging.info(f"start to create user events - user id: {user.id}")
 35    tbl_map["users"].extend([user.asdict(["orders"])])
 36    orders = user.orders
 37    tbl_map["orders"].extend([o.asdict(["order_items"]) for o in orders])
 38    for order in orders:
 39        order_items = order.order_items
 40        tbl_map["order_items"].extend(
 41            [
 42                o.asdict(["events", "inventory_items"] + extraneous_headers)
 43                for o in order_items
 44            ]
 45        )
 46        for order_item in order_items:
 47            tbl_map["inventory_items"].extend(
 48                [i.asdict() for i in order_item.inventory_items]
 49            )
 50            tbl_map["events"].extend([e.asdict() for e in order_item.events])
 51
 52    for tbl in tbl_map:
 53        df = pd.DataFrame(tbl_map[tbl])
 54        if len(df) > 0:
 55            logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
 56            insert_to_db(
 57                df=df,
 58                tbl_name=tbl,
 59                schema_name=schema_name,
 60                conn=conn,
 61                if_exists=if_exists,
 62            )
 63        else:
 64            logging.info(
 65                f"skip records as no user event, table - {tbl}, # records - {len(df)}"
 66            )
 67
 68
 69def write_static_data(
 70    conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
 71):
 72    tbl_map = {
 73        "products": generate_from_csv("products.csv"),
 74        "dist_centers": generate_from_csv("distribution_centers.csv"),
 75    }
 76    for tbl in tbl_map:
 77        df = pd.DataFrame(tbl_map[tbl])
 78        if len(df) > 0:
 79            logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
 80            insert_to_db(
 81                df=df,
 82                tbl_name=tbl,
 83                schema_name=schema_name,
 84                conn=conn,
 85                if_exists=if_exists,
 86            )
 87        else:
 88            logging.info(f"skip writing, table - {tbl}, # records - {len(df)}")
 89
 90
 91def main(wait_for: float, max_iter: int, if_exists: str):
 92    conn = create_connection()
 93    write_static_data(conn=conn, if_exists="replace")
 94    curr_iter = 0
 95    while True:
 96        write_dynamic_data(conn=conn, if_exists=if_exists)
 97        time.sleep(wait_for)
 98        curr_iter += 1
 99        if max_iter > 0 and curr_iter >= max_iter:
100            logging.info(f"stop generating records after {curr_iter} iterations")
101            break
102
103
104if __name__ == "__main__":
105    logging.getLogger().setLevel(logging.INFO)
106    logging.info("Generate theLook eCommerce data...")
107
108    parser = argparse.ArgumentParser(description="Generate theLook eCommerce data")
109    parser.add_argument(
110        "--if_exists",
111        "-i",
112        type=str,
113        default="append",
114        choices=["fail", "replace", "append"],
115        help="The time to wait before generating new user records",
116    )
117    parser.add_argument(
118        "--wait_for",
119        "-w",
120        type=float,
121        default=1,
122        help="The time to wait before generating new user records",
123    )
124    parser.add_argument(
125        "--max_iter",
126        "-m",
127        type=int,
128        default=-1,
129        help="The maxium number of iterations to generate user records",
130    )
131    args = parser.parse_args()
132    logging.info(args)
133    main(args.wait_for, args.max_iter, if_exists=args.if_exists)

In the following example, we see data is generated in every two seconds (-w 2).

 1python data_gen.py -w 2
 2INFO:root:Generate theLook eCommerce data...
 3INFO:root:Namespace(if_exists='append', wait_for=2.0, max_iter=-1)
 4INFO:root:replace records, table - products, # records - 29120
 5INFO:root:replace records, table - dist_centers, # records - 10
 6INFO:root:start to create user events - user id: 2a444cd4-aa70-4247-b1c1-9cf9c8cc1924
 7INFO:root:append records, table - users, # records - 1
 8INFO:root:append records, table - orders, # records - 1
 9INFO:root:append records, table - order_items, # records - 2
10INFO:root:append records, table - inventory_items, # records - 5
11INFO:root:append records, table - events, # records - 14
12INFO:root:start to create user events - user id: 7d40f7f8-c022-4104-a1a0-9228da07fbe4
13INFO:root:append records, table - users, # records - 1
14INFO:root:skip records as no user event, table - orders, # records - 0
15INFO:root:skip records as no user event, table - order_items, # records - 0
16INFO:root:skip records as no user event, table - inventory_items, # records - 0
17INFO:root:skip records as no user event, table - events, # records - 0
18INFO:root:start to create user events - user id: 45f8469c-3e79-40ee-9639-1cb17cd98132
19INFO:root:append records, table - users, # records - 1
20INFO:root:skip records as no user event, table - orders, # records - 0
21INFO:root:skip records as no user event, table - order_items, # records - 0
22INFO:root:skip records as no user event, table - inventory_items, # records - 0
23INFO:root:skip records as no user event, table - events, # records - 0
24INFO:root:start to create user events - user id: 839e353f-07ee-4d77-b1de-2f1af9b12501
25INFO:root:append records, table - users, # records - 1
26INFO:root:append records, table - orders, # records - 2
27INFO:root:append records, table - order_items, # records - 3
28INFO:root:append records, table - inventory_items, # records - 9
29INFO:root:append records, table - events, # records - 19

When the data is ingested into the database, we see the following tables are created in the ecommerce schema.

Data Subscriber

The subscriber expects a topic name as an argument (default demo.ecommerce.orders) and assumes a subscription of the topic is named in the <topic-name>.sub format. While it subscribes a topic, it prints the payload attribute of each message, followed by acknowledging it.

 1# ps_sub.py
 2import os
 3import json
 4import argparse
 5
 6from google.cloud import pubsub_v1
 7
 8from src import ps_utils
 9
10
11def callback(message):
12    print(json.loads(message.data.decode())["payload"])
13    message.ack()
14
15
16if __name__ == "__main__":
17    parser = argparse.ArgumentParser(description="Create PubSub resources")
18    parser.add_argument(
19        "--emulator-host",
20        "-e",
21        default="localhost:8085",
22        help="PubSub emulator host address",
23    )
24    parser.add_argument(
25        "--project-id", "-p", default="test-project", help="GCP project id"
26    )
27    parser.add_argument(
28        "--topic",
29        "-t",
30        default="demo.ecommerce.orders",
31        help="PubSub topic name",
32    )
33    args = parser.parse_args()
34    os.environ["PUBSUB_EMULATOR_HOST"] = args.emulator_host
35    os.environ["PUBSUB_PROJECT_ID"] = args.project_id
36
37    with pubsub_v1.SubscriberClient() as subscriber:
38        future = subscriber.subscribe(
39            ps_utils.set_sub_name(args.project_id, f"{args.topic}.sub"), callback
40        )
41        try:
42            future.result()
43        except KeyboardInterrupt:
44            future.cancel()

Below shows an example of subscribing the topic that keeps the change messages of the orders table.

1python ps_sub.py -t demo.ecommerce.orders
2{'order_id': '5db78495-2d65-4ebf-871f-cdc66eb1ed61', 'user_id': '3e3ccd36-401b-4ea2-bd4b-eab63fcd1c5f', 'status': 'Shipped', 'gender': 'M', 'created_at': 1724210040000000, 'returned_at': None, 'shipped_at': 1724446140000000, 'delivered_at': None, 'num_of_item': 2, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
3{'order_id': '040d7068-697c-4855-a5b1-fcf22d9ebd34', 'user_id': '3f9d209b-8042-47b0-ae2a-7ee1938f7c45', 'status': 'Cancelled', 'gender': 'F', 'created_at': 1702051020000000, 'returned_at': None, 'shipped_at': None, 'delivered_at': None, 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
4{'order_id': '1d879c6e-ef5e-41b9-8120-6afaa6f054e6', 'user_id': '3f9d209b-8042-47b0-ae2a-7ee1938f7c45', 'status': 'Shipped', 'gender': 'F', 'created_at': 1714579020000000, 'returned_at': None, 'shipped_at': 1714592100000000, 'delivered_at': None, 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
5{'order_id': 'b35ad9f1-583c-4b5d-8019-bcd1880223c8', 'user_id': 'e9c4a660-8b0b-4b50-a48f-f99480779070', 'status': 'Complete', 'gender': 'M', 'created_at': 1699584900000000, 'returned_at': None, 'shipped_at': 1699838520000000, 'delivered_at': '2023-11-15 16:52:00', 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
6{'order_id': 'd06bd821-e2c7-4ee8-bc70-66b3fc5cc1a3', 'user_id': 'e9c4a660-8b0b-4b50-a48f-f99480779070', 'status': 'Complete', 'gender': 'M', 'created_at': 1670640900000000, 'returned_at': None, 'shipped_at': 1670896920000000, 'delivered_at': '2022-12-16 23:07:00', 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
7{'order_id': '5928f275-9333-4036-ad25-c92d47c6b2ed', 'user_id': 'e9c4a660-8b0b-4b50-a48f-f99480779070', 'status': 'Complete', 'gender': 'M', 'created_at': 1615172100000000, 'returned_at': None, 'shipped_at': 1615278480000000, 'delivered_at': '2021-03-11 14:06:00', 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
8{'order_id': '27937664-36b0-4b1c-a0f6-2e027701398e', 'user_id': 'e3ed22b9-4101-46e9-b642-a0317332f267', 'status': 'Cancelled', 'gender': 'M', 'created_at': 1719298320000000, 'returned_at': None, 'shipped_at': None, 'delivered_at': None, 'num_of_item': 2, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}