Change data capture (CDC) is a proven data integration pattern that has a wide range of applications. Among those, data replication to data lakes is a good use case in data engineering. Coupled with best-in-breed data lake formats such as Apache Hudi, we can build an efficient data replication solution. This is the first post of the data lake demo series. Over time, we’ll build a data lake that uses CDC. As a starting point, we’ll discuss the source database and CDC streaming infrastructure in the local environment.
Architecture
As described in a Red Hat IT topics article, change data capture (CDC) is a proven data integration pattern to track when and what changes occur in data then alert other systems and services that must respond to those changes. Change data capture helps maintain consistency and functionality across all systems that rely on data.
The primary use of CDC is to enable applications to respond almost immediately whenever data in databases change. Specifically its use cases cover microservices integration, data replication with up-to-date data, building time-sensitive analytics dashboards, auditing and compliance, cache invalidation, full-text search and so on. There are a number of approaches for CDC - polling, dual writes and log-based CDC. Among those, log-based CDC has advantages to other approaches.
Both Amazon DMS and Debezium implement log-based CDC. While the former is a managed service, the latter can be deployed to a Kafka cluster as a (source) connector. It uses Apache Kafka as a messaging service to deliver database change notifications to the applicable systems and applications. Note that Kafka Connect is a tool for streaming data between Apache Kafka and other data systems by connectors in a scalable and reliable way. In AWS, we can use Amazon MSK and MSK Connect for building a Debezium based CDC solution.
Data replication to data lakes using CDC can be much more effective if data is stored to a format that supports atomic transactions and consistent updates. Popular choices are Apache Hudi, Apache Iceberg and Delta Lake. Among those, Apache Hudi can be a good option as it is well-integrated with AWS services.
Below shows the architecture of the data lake solution that we will be building in this series of posts.
- Employing the transactional outbox pattern, the source database publishes change event records to the CDC event table. The event records are generated by triggers that listen to insert and update events on source tables.
- CDC is implemented in a streaming environment and Amazon MSK is used to build the streaming infrastructure. In order to process the real-time CDC event records, a source and sink connectors are set up in Amazon MSK Connect. The Debezium connector for PostgreSQL is used as the source connector and the Lenses S3 connector is used as the sink connector. The sink connector pushes messages to a S3 bucket.
- Hudi DeltaStreamer is run on Amazon EMR. As a spark application, it reads files from the S3 bucket and upserts Hudi records to another S3 bucket. The Hudi table is created in the AWS Glue Data Catalog.
- The Hudi table is queried in Amazon Athena while the table is registered in the AWS Glue Data Catalog.
- Dashboards are created in Amazon Quicksight where the dataset is created using Amazon Athena.
As a starting point, we’ll discuss the source database and streaming infrastructure in the local environment.
Source Database
Data Model
We will use the Northwind database as the source database. It was originally created by Microsoft and used by various tutorials of their database products. It contains the sales data for a fictitious company called Northwind Traders that deals with specialty foods from around the world. As shown in the following entity relationship diagram, it includes a schema for a small business ERP with customers, products, orders, employees and so on. The version that is ported to PostgreSQL is obtained from YugabyteDB sample datasets and the SQL scripts can be found in the project GitHub repository. For local development, a service is created using docker compose, and it’ll be illustrated in the next section.
Outbox Table and Event Generation
It is straightforward to capture changes from multiple tables in a database using Kafka connectors where a separate topic is created for each table. Data ingestion to Hudi, however, can be complicated if messages are stored in multiple Kafka topics. Note that we will use the DeltaStreamer utility, and it maps to a single topic. In order to simplify the data ingestion process, we can employ the transactional outbox pattern. Using this pattern, we can create an outbox table (cdc_events
) and upsert a record to it when a new transaction is made. In this way, all database changes can be pushed to a single topic, resulting in one DeltaStreamer process to listen to the change events.
Below shows the table creation statement of the outbox table. It aims to store all details of an order entry in a row. The columns that have the JSONB data type store attributes of other entities. For example, the order_items column includes ordered product information. As multiple products can be purchased, it keeps an array of product ID, unit price, quantity and discount.
1-- ./data/sql/03_cdc_events.sql
2CREATE TABLE cdc_events(
3 order_id SMALLINT NOT NULL PRIMARY KEY,
4 customer_id BPCHAR NOT NULL,
5 order_date DATE,
6 required_date DATE,
7 shipped_date DATE,
8 order_items JSONB,
9 products JSONB,
10 customer JSONB,
11 employee JSONB,
12 shipper JSONB,
13 shipment JSONB,
14 updated_at TIMESTAMPTZ
15);
In order to create event records, triggers are added to the orders and order_details tables. They execute the fn_insert_order_event
function after an INSERT or UPDATE action occurs to the respective tables. Note the DELETE action is not considered in the event generation process for simplicity. The trigger function basically collects details of an order entry and attempts to insert a new record to the outbox table. If a record with the same order ID exists, it updates the record instead.
1-- ./data/sql/03_cdc_events.sql
2CREATE TRIGGER orders_triggered
3 AFTER INSERT OR UPDATE
4 ON orders
5 FOR EACH ROW
6 EXECUTE PROCEDURE fn_insert_order_event();
7
8CREATE TRIGGER order_details_triggered
9 AFTER INSERT OR UPDATE
10 ON order_details
11 FOR EACH ROW
12 EXECUTE PROCEDURE fn_insert_order_event();
13
14CREATE OR REPLACE FUNCTION fn_insert_order_event()
15 RETURNS TRIGGER
16 LANGUAGE PLPGSQL
17 AS
18$$
19BEGIN
20 IF (TG_OP IN ('INSERT', 'UPDATE')) THEN
21 WITH product_details AS (
22 SELECT p.product_id,
23 row_to_json(p.*)::jsonb AS product_details
24 FROM (
25 SELECT *
26 FROM products p
27 JOIN suppliers s ON p.supplier_id = s.supplier_id
28 JOIN categories c ON p.category_id = c.category_id
29 ) AS p
30 ), order_items AS (
31 SELECT od.order_id,
32 jsonb_agg(row_to_json(od.*)::jsonb - 'order_id') AS order_items,
33 jsonb_agg(pd.product_details) AS products
34 FROM order_details od
35 JOIN product_details pd ON od.product_id = pd.product_id
36 WHERE od.order_id = NEW.order_id
37 GROUP BY od.order_id
38 ), emps AS (
39 SELECT employee_id,
40 row_to_json(e.*)::jsonb AS details
41 FROM employees e
42 ), emp_territories AS (
43 SELECT et.employee_id,
44 jsonb_agg(
45 row_to_json(t.*)
46 ) AS territories
47 FROM employee_territories et
48 JOIN (
49 SELECT t.territory_id, t.territory_description, t.region_id, r.region_description
50 FROM territories t
51 JOIN region r ON t.region_id = r.region_id
52 ) AS t ON et.territory_id = t.territory_id
53 GROUP BY et.employee_id
54 ), emp_details AS (
55 SELECT e.employee_id,
56 e.details || jsonb_build_object('territories', et.territories) AS details
57 FROM emps AS e
58 JOIN emp_territories AS et ON e.employee_id = et.employee_id
59 )
60 INSERT INTO cdc_events
61 SELECT o.order_id,
62 o.customer_id,
63 o.order_date,
64 o.required_date,
65 o.shipped_date,
66 oi.order_items,
67 oi.products,
68 row_to_json(c.*)::jsonb AS customer,
69 ed.details::jsonb AS employee,
70 row_to_json(s.*)::jsonb AS shipper,
71 jsonb_build_object(
72 'freight', o.freight,
73 'ship_name', o.ship_name,
74 'ship_address', o.ship_address,
75 'ship_city', o.ship_city,
76 'ship_region', o.ship_region,
77 'ship_postal_code', o.ship_postal_code,
78 'ship_country', o.ship_country
79 ) AS shipment,
80 now()
81 FROM orders o
82 LEFT JOIN order_items oi ON o.order_id = oi.order_id
83 JOIN customers c ON o.customer_id = c.customer_id
84 JOIN emp_details ed ON o.employee_id = ed.employee_id
85 JOIN shippers s ON o.ship_via = s.shipper_id
86 WHERE o.order_id = NEW.order_id
87 ON CONFLICT (order_id)
88 DO UPDATE
89 SET order_id = excluded.order_id,
90 customer_id = excluded.customer_id,
91 order_date = excluded.order_date,
92 required_date = excluded.required_date,
93 shipped_date = excluded.shipped_date,
94 order_items = excluded.order_items,
95 products = excluded.products,
96 customer = excluded.customer,
97 shipper = excluded.shipper,
98 shipment = excluded.shipment,
99 updated_at = excluded.updated_at;
100 END IF;
101 RETURN NULL;
102END
103$$;
Create Initial Event Records
In order to create event records for existing order entries, a stored procedure is created - usp_init_order_events
. It is quite similar to the trigger function and can be checked in the project GitHub repository. The procedure is called at database initialization and a total of 829 event records are created by that.
1-- ./data/sql/03_cdc_events.sql
2CALL usp_init_order_events();
Below shows a simplified event record, converted into JSON. For the order with ID 10248, 3 products are ordered by a customer whose ID is VINET.
1{
2 "order_id": 10248,
3 "customer_id": "VINET",
4 "order_date": "1996-07-04",
5 "required_date": "1996-08-01",
6 "shipped_date": "1996-07-16",
7 "order_items": [
8 { "discount": 0, "quantity": 12, "product_id": 11, "unit_price": 14 },
9 { "discount": 0, "quantity": 10, "product_id": 42, "unit_price": 9.8 },
10 { "discount": 0, "quantity": 5, "product_id": 72, "unit_price": 34.8 }
11 ],
12 "products": [
13 { "product_id": 11, "product_name": "Queso Cabrales" },
14 { "product_id": 42, "product_name": "Singaporean Hokkien Fried Mee" },
15 { "product_id": 72, "product_name": "Mozzarella di Giovanni" }
16 ],
17 "customer": {
18 "customer_id": "VINET",
19 "company_name": "Vins et alcools Chevalier"
20 },
21 "employee": {
22 "title": "Sales Manager",
23 "last_name": "Buchanan",
24 "employee_id": 5
25 },
26 "shipper": {
27 "company_name": "Federal Shipping"
28 },
29 "shipment": {
30 "freight": 32.38,
31 "ship_name": "Vins et alcools Chevalier"
32 },
33 "updated_at": "2021-11-27T20:30:13.644579+11:00"
34}
Create Publication
As discussed further in the next section, we’ll be using the native pgoutput
logical replication stream support. Debezium, Kafka source connector, automatically creates a publication that contains all tables if it doesn’t exist. It can cause trouble to update a record to a table that doesn’t have the primary key or replica identity. In order to handle such an issue, a publication that contains only the outbox table is created. This publication will be used when configuring the source connector.
1-- ./data/sql/03_cdc_events.sql
2CREATE PUBLICATION cdc_publication
3 FOR TABLE cdc_events;
CDC Development
Docker Compose
The Confluent platform can be handy for local development, although we’ll be deploying the solution using Amazon MSK and MSK Connect. The quick start guide provides a docker compose file that includes its various components in separate services. It also contains the control center, a graphical user interface, which helps check brokers, topics, messages and connectors easily.
Additionally, we need a PostgreSQL instance for the Northwind database and a service named postgres
is added. The database is initialised by a set of SQL scripts. They are executed by volume-mapping to the initialisation folder - the scripts can be found in the project GitHub repository. Also, the Kafka Connect instance, running in the connect service, needs an update to include the source and sink connectors. It’ll be illustrated further below.
Below shows a cut-down version of the docker compose file that we use for local development. The complete file can be found in the project GitHub repository.
1# ./docker-compose.yml
2version: "2"
3services:
4 postgres:
5 image: debezium/postgres:13
6 ...
7 ports:
8 - 5432:5432
9 volumes:
10 - ./data/sql:/docker-entrypoint-initdb.d
11 environment:
12 - POSTGRES_DB=devdb
13 - POSTGRES_USER=devuser
14 - POSTGRES_PASSWORD=password
15 zookeeper:
16 image: confluentinc/cp-zookeeper:6.2.1
17 ...
18 ports:
19 - "2181:2181"
20 environment:
21 ...
22 broker:
23 image: confluentinc/cp-server:6.2.1
24 ...
25 depends_on:
26 - zookeeper
27 ports:
28 - "9092:9092"
29 - "9101:9101"
30 environment:
31 ...
32 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
33 ...
34 schema-registry:
35 image: confluentinc/cp-schema-registry:6.2.1
36 ...
37 depends_on:
38 ...
39 ports:
40 - "8081:8081"
41 environment:
42 ...
43 connect:
44 build: .connector/local/cp-server-connect-datagen
45 ...
46 depends_on:
47 ...
48 ports:
49 - "8083:8083"
50 volumes:
51 - ${HOME}/.aws:/home/appuser/.aws
52 environment:
53 CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
54 CONNECT_REST_ADVERTISED_HOST_NAME: connect
55 CONNECT_REST_PORT: 8083
56 ...
57 # include /usr/local/share/kafka/plugins for community connectors
58 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins"
59 ...
60 control-center:
61 image: confluentinc/cp-enterprise-control-center:6.2.1
62 ...
63 depends_on:
64 ...
65 ports:
66 - "9021:9021"
67 environment:
68 CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092"
69 CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: "connect:8083"
70 ...
71 PORT: 9021
72 rest-proxy:
73 image: confluentinc/cp-kafka-rest:6.2.1
74 ...
75 depends_on:
76 ...
77 ports:
78 - 8082:8082
79 environment:
80 ...
Install Connectors
We use the Debezium connector for PostgreSQL as the source connector and Lenses S3 Connector as the sink connector. The source connector is installed via the confluent hub client while the sink connector is added as a community connector. Note that the environment variable of CONNECT_PLUGIN_PATH is updated to include the kafka plugin folder (/usr/local/share/kafka/plugins
).
1# .connector/local/cp-server-connect-datagen/Dockerfile
2FROM cnfldemos/cp-server-connect-datagen:0.5.0-6.2.1
3
4# install debezium postgresql connector from confluent hub
5RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:1.7.1
6
7# install lenses S3 connector as a community connector - https://docs.confluent.io/home/connect/community.html
8USER root
9RUN mkdir -p /usr/local/share/kafka/plugins/kafka-connect-aws-s3 && \
10 curl -SsL https://github.com/lensesio/stream-reactor/releases/download/3.0.0/kafka-connect-aws-s3-3.0.0-2.5.0-all.tar.gz \
11 | tar -C /usr/local/share/kafka/plugins/kafka-connect-aws-s3 --warning=no-unknown-keyword -xzf -
12
13# update connect plugin path
14ENV CONNECT_PLUGIN_PATH=$CONNECT_PLUGIN_PATH,/usr/local/share/kafka/plugins
15USER appuser
Start Services
After starting the docker compose services, we can check a local Kafka cluster in the control center via http://localhost:9021
. We can go to the cluster overview page by clicking the cluster card item.
We can check an overview of the cluster in the page. For example, it shows the cluster has 1 broker and there is no running connector. On the left side, there are menus for individual components. Among those, Topics and Connect will be our main interest in this post.
Create Connectors
Source Connector
Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL versions 9.6, 10, 11, 12 and 13 are supported. The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.
The connector has a number of connector properties including name, connector class, database connection details, key/value converter and so on - the full list of properties can be found in this page. The properties that need explanation are listed below.
plugin.name
- Using the logical decoding feature, an output plug-in enables clients to consume changes to the transaction log in a user-friendly manner. Debezium supportsdecoderbufs
,wal2json
andpgoutput
plug-ins. Bothwal2json
andpgoutput
are available in Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL.decoderbufs
requires a separate installation, and it is excluded from the option. Among the 2 supported plug-ins,pgoutput
is selected because it is the standard logical decoding output plug-in in PostgreSQL 10+ and has better performance for large transactions.publication.name
- With thepgoutput
plug-in, the Debezium connector creates a publication (if not exists) and setspublication.autocreate.mode
to all_tables. It can cause an issue to update a record to a table that doesn’t have the primary key or replica identity. We can set the value to filtered where the connector adjusts the applicable tables by other property values. Alternatively we can create a publication on our own and add the name to publication.name property. I find creating a publication explicitly is easier to maintain. Note a publication alone is not sufficient to handle the issue. All affected tables by the publication should have the primary key or replica identity. In our example, the _orders _and _order_details _tables should meet the condition. In short, creating an explicit publication can prevent the event generation process from interrupting other processes by limiting the scope of CDC event generation.key.converter/value.converter
- Although Avro serialization is recommended, JSON is a format that can be generated without schema registry and can be read by DeltaStreamer.transforms
- A Debezium event data has a complex structure that provides a wealth of information. It can be quite difficult to process such a structure using DeltaStreamer. Debezium’s event flattening single message transformation (SMT) is configured to flatten the output payload.
Note once the connector is deployed, the CDC event records will be published to demo.datalake.cdc_events
topic.
1// ./connector/local/source-debezium.json
2{
3 "name": "orders-source",
4 "config": {
5 "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
6 "tasks.max": "1",
7 "plugin.name": "pgoutput",
8 "publication.name": "cdc_publication",
9 "database.hostname": "postgres",
10 "database.port": "5432",
11 "database.user": "devuser",
12 "database.password": "password",
13 "database.dbname": "devdb",
14 "database.server.name": "demo",
15 "schema.include": "datalake",
16 "table.include.list": "datalake.cdc_events",
17 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
18 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
19 "transforms": "unwrap",
20 "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
21 "transforms.unwrap.drop.tombstones": "false",
22 "transforms.unwrap.delete.handling.mode": "rewrite",
23 "transforms.unwrap.add.fields": "op,db,table,schema,lsn,source.ts_ms"
24 }
25}
The source connector is created using the API and its status can be checked as shown below.
1## create debezium source connector
2curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
3 http://localhost:8083/connectors/ -d @connector/local/source-debezium.json
4
5## check connector status
6curl http://localhost:8083/connectors/orders-source/status
7
8#{
9# "name": "orders-source",
10# "connector": { "state": "RUNNING", "worker_id": "connect:8083" },
11# "tasks": [{ "id": 0, "state": "RUNNING", "worker_id": "connect:8083" }],
12# "type": "source"
13#}
Sink Connector
Lenses S3 Connector is a Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets. It extends the standard connect config adding a parameter for a SQL command (Lenses Kafka Connect Query Language or “KCQL”). This defines how to map data from the source (in this case Kafka) to the target (S3). Importantly, it also includes how data should be partitioned into S3, the bucket names and the serialization format (support includes JSON, Avro, Parquet, Text, CSV and binary).
I find the Lenses S3 connector is more straightforward to configure than the Confluent S3 sink connector for its SQL-like syntax. The KCQL configuration indicates that object files are set to be
- moved from a Kafka topic (
demo.datalake.cdc_events
) to an S3 bucket (data-lake-demo-cevo
) with object prefix of _cdc-events-local
, - partitioned by customer_id and order_id e.g.
customer_id=<customer-id>/order_id=<order-id>
, - stored as the JSON format and,
- flushed every 60 seconds or when there are 50 records.
1// ./connector/local/sink-s3.json
2{
3 "name": "orders-sink",
4 "config": {
5 "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
6 "tasks.max": "1",
7 "connect.s3.kcql": "INSERT INTO data-lake-demo-cevo:cdc-events-local SELECT * FROM demo.datalake.cdc_events PARTITIONBY customer_id,order_id STOREAS `json` WITH_FLUSH_INTERVAL = 60 WITH_FLUSH_COUNT = 50",
8 "aws.region": "ap-southeast-2",
9 "aws.custom.endpoint": "https://s3.ap-southeast-2.amazonaws.com/",
10 "topics": "demo.datalake.cdc_events",
11 "key.converter.schemas.enable": "false",
12 "schema.enable": "false",
13 "errors.log.enable": "true",
14 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
15 "value.converter": "org.apache.kafka.connect.json.JsonConverter"
16 }
17}
The sink connector is created using the API and its status can be checked as shown below.
1### create s3 sink connector
2curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
3 http://localhost:8083/connectors/ -d @connector/local/sink-s3.json
4
5## check connector status
6curl http://localhost:8083/connectors/orders-sink/status
7
8#{
9# "name": "orders-sink",
10# "connector": { "state": "RUNNING", "worker_id": "connect:8083" },
11# "tasks": [{ "id": 0, "state": "RUNNING", "worker_id": "connect:8083" }],
12# "type": "sink"
13#}
We can also check the details of the connectors from the control center.
In the Topics menu, we are able to see that the demo.datalake.cdc_events topic is created by the Debezium connector.
We can check messages of a topic by clicking the topic name. After adding an offset value (e.g. 0) to the input element, we are able to see messages of the topic. We can check message fields on the left-hand side or download a message in the JSON or CSV format.
Check Event Output Files
We can check the output files that are processed by the sink connector in S3. Below shows an example record where _customer_id _is RATTC and _order_id _is 11077. As configured, the objects are prefixed by cdc-events-local
and further partitioned by _customer_id _and order_id. The naming convention of output files is <topic-name>(partition_offset).ext
.
Update Event Example
The above order record has a NULL shipped_date value. When we update it using the following SQL statement, we should be able to see a new output file with the updated value.
1BEGIN TRANSACTION;
2 UPDATE orders
3 SET shipped_date = '1998-06-15'::date
4 WHERE order_id = 11077;
5COMMIT TRANSACTION;
6END;
In S3, we are able to see that a new output file is stored. In the new output file, the _shipped_date _value is updated to 10392. Note that the Debezium connector converts the DATE type to the INT32 type, which represents the number of days since the epoch.
Insert Event Example
When a new order is created, it’ll insert a record to the _orders _table as well as one or more order items to the _order_details _table. Therefore, we expect multiple event records will be created when a new order is created. We can check it by inserting an order and related order details items.
1BEGIN TRANSACTION;
2 INSERT INTO orders VALUES (11075, 'RICSU', 8, '1998-05-06', '1998-06-03', NULL, 2, 6.19000006, 'Richter Supermarkt', 'Starenweg 5', 'Genève', NULL, '1204', 'Switzerland');
3 INSERT INTO order_details VALUES (11075, 2, 19, 10, 0.150000006);
4 INSERT INTO order_details VALUES (11075, 46, 12, 30, 0.150000006);
5 INSERT INTO order_details VALUES (11075, 76, 18, 2, 0.150000006);
6COMMIT TRANSACTION;
7END;
We can see the output file includes 4 JSON objects where the first object has NULL _order_items _and _products _value. We can also see that those values are expanded gradually in subsequent event records.
Conclusion
We discussed a data lake solution where data ingestion is performed using change data capture (CDC) and the output files are upserted to a Hudi table. Being registered to Glue Data Catalog, it can be used for ad-hoc queries and report/dashboard creation. For the solution, the Northwind database is used as the source database and, following the transactional outbox pattern, order-related changes are upserted to an outbox table by triggers. The data ingestion is developed in the local Confluent platform where the Debezium for PostgreSQL is used as the source connector and the Lenses S3 sink connector is used as the sink connector. We confirmed order creation and update events are captured as expected. In the next post, we’ll build the data ingestion part of the solution with Amazon MSK and MSK Connect.
Comments