Change data capture (CDC) is a data integration pattern to track changes in a database so that actions can be taken using the changed data. Debezium is probably the most popular open source platform for CDC. Originally providing Kafka source connectors, it also supports a ready-to-use application called Debezium server. The standalone application can be used to stream change events to other messaging infrastructure such as Google Cloud Pub/Sub, Amazon Kinesis and Apache Pulsar. In this post, we develop a CDC solution locally using Docker. The source of the theLook eCommerce is modified to generate data continuously, and the data is inserted into multiple tables of a PostgreSQL database. Among those tables, two of them are tracked by the Debezium server, and it pushes row-level changes of those tables into Pub/Sub topics on the Pub/Sub emulator. Finally, messages of the topics are read by a Python application.
Docker Compose Services
We have three docker-compose services, and each service is illustrated separately. The source of this post can be found in this GitHub repository.
PostgreSQL
A PostgreSQL database server is configured so that it enables logical replication (wal_level=logical
) at startup. It is necessary because we will be using the standard logical decoding output plug-in in PostgreSQL 10+ - see this page for details.
1# docker-compose.yml
2version: "3"
3services:
4 postgres:
5 image: postgres:16
6 container_name: postgres
7 command: ["postgres", "-c", "wal_level=logical"]
8 ports:
9 - 5432:5432
10 volumes:
11 - ./config/postgres:/docker-entrypoint-initdb.d
12 - postgres_data:/var/lib/postgresql/data
13 environment:
14 POSTGRES_DB: develop
15 POSTGRES_USER: develop
16 POSTGRES_PASSWORD: password
17 PGUSER: develop
18 TZ: Australia/Sydney
19...
20volumes:
21 postgres_data:
22 driver: local
23 name: postgres_data
The bootstrap script creates a dedicated schema named ecommerce and sets the schema as the default search path. It ends up creating a custom publication for all tables in the ecommerce schema.
1-- config/postgres/bootstrap.sql
2CREATE SCHEMA ecommerce;
3GRANT ALL ON SCHEMA ecommerce TO develop;
4
5-- change search_path on a connection-level
6SET search_path TO ecommerce;
7
8-- change search_path on a database-level
9ALTER database "develop" SET search_path TO ecommerce;
10
11-- create a publication for all tables in the ecommerce schema
12CREATE PUBLICATION cdc_publication FOR TABLES IN SCHEMA ecommerce;
Debezium Server
The Debezium server configuration is fairly simple, and the application properties are supplied by volume mapping.
1# docker-compose.yml
2version: "3"
3services:
4...
5 debezium:
6 image: debezium/server:3.0.0.Final
7 container_name: debezium
8 volumes:
9 - ./config/debezium:/debezium/config
10 depends_on:
11 - postgres
12 restart: always
13...
The application properties can be grouped into four sections.
- Sink configuration
- The sink type is set up as pubsub, and the address of the Pub/Sub emulator is specified as an extra.
- Source configuration
- The PostgreSQL source connector class is specified, followed by adding the database details.
- Only two tables in the ecommerce schema are included, and the output plugin and replication names are configuring as required.
- Note that the topic prefix (debezium.source.topic.prefix) is mandatory, and the Debezium server expects the corresponding Pub/Sub topic exists where its name is in the
<prefix>.<schema>.<table-name>
format. Therefore, we need to create topics for the two tables before we start the server (if there are records already) or before we send records to the database (if there is no record).
- Message transform
- The change messages are simplified using a single message transform. With this transform, the message includes only a single payload that keeps record attributes and additional message metadata as specified in
debezium.transforms.unwrap.add.fields
.
- The change messages are simplified using a single message transform. With this transform, the message includes only a single payload that keeps record attributes and additional message metadata as specified in
- Log configuration
- The default log format is JSON, and it is disabled to produce log messages as console outputs for ease of troubleshooting.
1# config/debezium/application.properties
2## sink config
3debezium.sink.type=pubsub
4debezium.sink.pubsub.project.id=test-project
5debezium.sink.pubsub.ordering.enabled=true
6debezium.sink.pubsub.address=pubsub:8085
7## source config
8debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
9debezium.source.offset.storage.file.filename=data/offsets.dat
10debezium.source.offset.flush.interval.ms=5000
11debezium.source.database.hostname=postgres
12debezium.source.database.port=5432
13debezium.source.database.user=develop
14debezium.source.database.password=password
15debezium.source.database.dbname=develop
16# topic.prefix is mandatory, topic name should be <prefix>.<schema>.<table-name>
17debezium.source.topic.prefix=demo
18debezium.source.table.include.list=ecommerce.orders,ecommerce.order_items
19debezium.source.plugin.name=pgoutput
20debezium.source.publication.name=cdc_publication
21debezium.source.tombstones.on.delete=false
22## SMT - unwrap
23debezium.transforms=unwrap
24debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
25debezium.transforms.unwrap.drop.tombstones=false
26debezium.transforms.unwrap.delete.handling.mode=rewrite
27debezium.transforms.unwrap.add.fields=op,db,table,schema,lsn,source.ts_ms
28## log config
29quarkus.log.console.json=false
30quarkus.log.file.enable=false
31quarkus.log.level=INFO
Below shows an output payload of one of the tables. Those that are prefixed by double underscore (__
) are message metadata.
1{
2 "order_id": "5db78495-2d65-4ebf-871f-cdc66eb1ed61",
3 "user_id": "3e3ccd36-401b-4ea2-bd4b-eab63fcd1c5f",
4 "status": "Shipped",
5 "gender": "M",
6 "created_at": 1724210040000000,
7 "returned_at": null,
8 "shipped_at": 1724446140000000,
9 "delivered_at": null,
10 "num_of_item": 2,
11 "__deleted": "false",
12 "__op": "r",
13 "__db": "develop",
14 "__table": "orders",
15 "__schema": "ecommerce",
16 "__lsn": 32919936,
17 "__source_ts_ms": 1730183224763
18}
Pub/Sub Emulator
The Pub/Sub emulator is started as a gcloud component on port 8085.
1# docker-compose.yml
2version: "3"
3services:
4...
5 pubsub:
6 image: google/cloud-sdk:497.0.0-emulators
7 container_name: pubsub
8 command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
9 ports:
10 - "8085:8085"
11...
Solution Deployment
We can start the docker-compose services by docker-compose up -d
. Then, we should create the topics into which the change messages are ingested. The topics and subscriptions to them can be created by the following Python script.
1# ps_setup.py
2import os
3import argparse
4
5from src import ps_utils
6
7
8if __name__ == "__main__":
9 parser = argparse.ArgumentParser(description="Create PubSub resources")
10 parser.add_argument(
11 "--emulator-host",
12 "-e",
13 default="localhost:8085",
14 help="PubSub emulator host address",
15 )
16 parser.add_argument(
17 "--project-id", "-p", default="test-project", help="GCP project id"
18 )
19 parser.add_argument(
20 "--topics",
21 "-t",
22 action="append",
23 default=["demo.ecommerce.orders", "demo.ecommerce.order_items"],
24 help="PubSub topic names",
25 )
26 args = parser.parse_args()
27 os.environ["PUBSUB_EMULATOR_HOST"] = args.emulator_host
28 os.environ["PUBSUB_PROJECT_ID"] = args.project_id
29
30 for name in set(args.topics):
31 ps_utils.create_topic(project_id=args.project_id, topic_name=name)
32 ps_utils.create_subscription(
33 project_id=args.project_id, sub_name=f"{name}.sub", topic_name=name
34 )
35 ps_utils.show_resources("topics", args.emulator_host, args.project_id)
36 ps_utils.show_resources("subscriptions", args.emulator_host, args.project_id)
Once executed, the topics and subscriptions are created as shown below.
1python ps_setup.py
2projects/test-project/topics/demo.ecommerce.order_items
3projects/test-project/topics/demo.ecommerce.orders
4projects/test-project/subscriptions/demo.ecommerce.order_items.sub
5projects/test-project/subscriptions/demo.ecommerce.orders.sub
Data Generator
The theLook eCommerce dataset has seven entities and five of them are generated dynamically. In each iteration, a user record is created, and it has zero or more orders. An order record creates zero or more order items in turn. Finally, an order item record creates zero or more event and inventory item objects. Once all records are generated, they are ingested into the relevant tables using pandas’ to_sql
method.
1# data_gen.py
2import argparse
3import time
4import logging
5
6import pandas as pd
7
8from src.models import User
9from src.utils import create_connection, insert_to_db, Connection, generate_from_csv
10
11extraneous_headers = [
12 "event_type",
13 "ip_address",
14 "browser",
15 "traffic_source",
16 "session_id",
17 "sequence_number",
18 "uri",
19 "is_sold",
20]
21
22
23def write_dynamic_data(
24 conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
25):
26 tbl_map = {
27 "users": [],
28 "orders": [],
29 "order_items": [],
30 "inventory_items": [],
31 "events": [],
32 }
33 user = User()
34 logging.info(f"start to create user events - user id: {user.id}")
35 tbl_map["users"].extend([user.asdict(["orders"])])
36 orders = user.orders
37 tbl_map["orders"].extend([o.asdict(["order_items"]) for o in orders])
38 for order in orders:
39 order_items = order.order_items
40 tbl_map["order_items"].extend(
41 [
42 o.asdict(["events", "inventory_items"] + extraneous_headers)
43 for o in order_items
44 ]
45 )
46 for order_item in order_items:
47 tbl_map["inventory_items"].extend(
48 [i.asdict() for i in order_item.inventory_items]
49 )
50 tbl_map["events"].extend([e.asdict() for e in order_item.events])
51
52 for tbl in tbl_map:
53 df = pd.DataFrame(tbl_map[tbl])
54 if len(df) > 0:
55 logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
56 insert_to_db(
57 df=df,
58 tbl_name=tbl,
59 schema_name=schema_name,
60 conn=conn,
61 if_exists=if_exists,
62 )
63 else:
64 logging.info(
65 f"skip records as no user event, table - {tbl}, # records - {len(df)}"
66 )
67
68
69def write_static_data(
70 conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
71):
72 tbl_map = {
73 "products": generate_from_csv("products.csv"),
74 "dist_centers": generate_from_csv("distribution_centers.csv"),
75 }
76 for tbl in tbl_map:
77 df = pd.DataFrame(tbl_map[tbl])
78 if len(df) > 0:
79 logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
80 insert_to_db(
81 df=df,
82 tbl_name=tbl,
83 schema_name=schema_name,
84 conn=conn,
85 if_exists=if_exists,
86 )
87 else:
88 logging.info(f"skip writing, table - {tbl}, # records - {len(df)}")
89
90
91def main(wait_for: float, max_iter: int, if_exists: str):
92 conn = create_connection()
93 write_static_data(conn=conn, if_exists="replace")
94 curr_iter = 0
95 while True:
96 write_dynamic_data(conn=conn, if_exists=if_exists)
97 time.sleep(wait_for)
98 curr_iter += 1
99 if max_iter > 0 and curr_iter >= max_iter:
100 logging.info(f"stop generating records after {curr_iter} iterations")
101 break
102
103
104if __name__ == "__main__":
105 logging.getLogger().setLevel(logging.INFO)
106 logging.info("Generate theLook eCommerce data...")
107
108 parser = argparse.ArgumentParser(description="Generate theLook eCommerce data")
109 parser.add_argument(
110 "--if_exists",
111 "-i",
112 type=str,
113 default="append",
114 choices=["fail", "replace", "append"],
115 help="The time to wait before generating new user records",
116 )
117 parser.add_argument(
118 "--wait_for",
119 "-w",
120 type=float,
121 default=1,
122 help="The time to wait before generating new user records",
123 )
124 parser.add_argument(
125 "--max_iter",
126 "-m",
127 type=int,
128 default=-1,
129 help="The maxium number of iterations to generate user records",
130 )
131 args = parser.parse_args()
132 logging.info(args)
133 main(args.wait_for, args.max_iter, if_exists=args.if_exists)
In the following example, we see data is generated in every two seconds (-w 2
).
1python data_gen.py -w 2
2INFO:root:Generate theLook eCommerce data...
3INFO:root:Namespace(if_exists='append', wait_for=2.0, max_iter=-1)
4INFO:root:replace records, table - products, # records - 29120
5INFO:root:replace records, table - dist_centers, # records - 10
6INFO:root:start to create user events - user id: 2a444cd4-aa70-4247-b1c1-9cf9c8cc1924
7INFO:root:append records, table - users, # records - 1
8INFO:root:append records, table - orders, # records - 1
9INFO:root:append records, table - order_items, # records - 2
10INFO:root:append records, table - inventory_items, # records - 5
11INFO:root:append records, table - events, # records - 14
12INFO:root:start to create user events - user id: 7d40f7f8-c022-4104-a1a0-9228da07fbe4
13INFO:root:append records, table - users, # records - 1
14INFO:root:skip records as no user event, table - orders, # records - 0
15INFO:root:skip records as no user event, table - order_items, # records - 0
16INFO:root:skip records as no user event, table - inventory_items, # records - 0
17INFO:root:skip records as no user event, table - events, # records - 0
18INFO:root:start to create user events - user id: 45f8469c-3e79-40ee-9639-1cb17cd98132
19INFO:root:append records, table - users, # records - 1
20INFO:root:skip records as no user event, table - orders, # records - 0
21INFO:root:skip records as no user event, table - order_items, # records - 0
22INFO:root:skip records as no user event, table - inventory_items, # records - 0
23INFO:root:skip records as no user event, table - events, # records - 0
24INFO:root:start to create user events - user id: 839e353f-07ee-4d77-b1de-2f1af9b12501
25INFO:root:append records, table - users, # records - 1
26INFO:root:append records, table - orders, # records - 2
27INFO:root:append records, table - order_items, # records - 3
28INFO:root:append records, table - inventory_items, # records - 9
29INFO:root:append records, table - events, # records - 19
When the data is ingested into the database, we see the following tables are created in the ecommerce schema.
Data Subscriber
The subscriber expects a topic name as an argument (default demo.ecommerce.orders) and assumes a subscription of the topic is named in the <topic-name>.sub
format. While it subscribes a topic, it prints the payload attribute of each message, followed by acknowledging it.
1# ps_sub.py
2import os
3import json
4import argparse
5
6from google.cloud import pubsub_v1
7
8from src import ps_utils
9
10
11def callback(message):
12 print(json.loads(message.data.decode())["payload"])
13 message.ack()
14
15
16if __name__ == "__main__":
17 parser = argparse.ArgumentParser(description="Create PubSub resources")
18 parser.add_argument(
19 "--emulator-host",
20 "-e",
21 default="localhost:8085",
22 help="PubSub emulator host address",
23 )
24 parser.add_argument(
25 "--project-id", "-p", default="test-project", help="GCP project id"
26 )
27 parser.add_argument(
28 "--topic",
29 "-t",
30 default="demo.ecommerce.orders",
31 help="PubSub topic name",
32 )
33 args = parser.parse_args()
34 os.environ["PUBSUB_EMULATOR_HOST"] = args.emulator_host
35 os.environ["PUBSUB_PROJECT_ID"] = args.project_id
36
37 with pubsub_v1.SubscriberClient() as subscriber:
38 future = subscriber.subscribe(
39 ps_utils.set_sub_name(args.project_id, f"{args.topic}.sub"), callback
40 )
41 try:
42 future.result()
43 except KeyboardInterrupt:
44 future.cancel()
Below shows an example of subscribing the topic that keeps the change messages of the orders table.
1python ps_sub.py -t demo.ecommerce.orders
2{'order_id': '5db78495-2d65-4ebf-871f-cdc66eb1ed61', 'user_id': '3e3ccd36-401b-4ea2-bd4b-eab63fcd1c5f', 'status': 'Shipped', 'gender': 'M', 'created_at': 1724210040000000, 'returned_at': None, 'shipped_at': 1724446140000000, 'delivered_at': None, 'num_of_item': 2, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
3{'order_id': '040d7068-697c-4855-a5b1-fcf22d9ebd34', 'user_id': '3f9d209b-8042-47b0-ae2a-7ee1938f7c45', 'status': 'Cancelled', 'gender': 'F', 'created_at': 1702051020000000, 'returned_at': None, 'shipped_at': None, 'delivered_at': None, 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
4{'order_id': '1d879c6e-ef5e-41b9-8120-6afaa6f054e6', 'user_id': '3f9d209b-8042-47b0-ae2a-7ee1938f7c45', 'status': 'Shipped', 'gender': 'F', 'created_at': 1714579020000000, 'returned_at': None, 'shipped_at': 1714592100000000, 'delivered_at': None, 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
5{'order_id': 'b35ad9f1-583c-4b5d-8019-bcd1880223c8', 'user_id': 'e9c4a660-8b0b-4b50-a48f-f99480779070', 'status': 'Complete', 'gender': 'M', 'created_at': 1699584900000000, 'returned_at': None, 'shipped_at': 1699838520000000, 'delivered_at': '2023-11-15 16:52:00', 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
6{'order_id': 'd06bd821-e2c7-4ee8-bc70-66b3fc5cc1a3', 'user_id': 'e9c4a660-8b0b-4b50-a48f-f99480779070', 'status': 'Complete', 'gender': 'M', 'created_at': 1670640900000000, 'returned_at': None, 'shipped_at': 1670896920000000, 'delivered_at': '2022-12-16 23:07:00', 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
7{'order_id': '5928f275-9333-4036-ad25-c92d47c6b2ed', 'user_id': 'e9c4a660-8b0b-4b50-a48f-f99480779070', 'status': 'Complete', 'gender': 'M', 'created_at': 1615172100000000, 'returned_at': None, 'shipped_at': 1615278480000000, 'delivered_at': '2021-03-11 14:06:00', 'num_of_item': 1, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
8{'order_id': '27937664-36b0-4b1c-a0f6-2e027701398e', 'user_id': 'e3ed22b9-4101-46e9-b642-a0317332f267', 'status': 'Cancelled', 'gender': 'M', 'created_at': 1719298320000000, 'returned_at': None, 'shipped_at': None, 'delivered_at': None, 'num_of_item': 2, '__deleted': 'false', '__op': 'r', '__db': 'develop', '__table': 'orders', '__schema': 'ecommerce', '__lsn': 32919936, '__source_ts_ms': 1730183224763}
Comments