Change data capture (CDC) is a proven data integration pattern that has a wide range of applications. Among those, data replication to data lakes is a good use case in data engineering. Coupled with best-in-breed data lake formats such as Apache Hudi, we can build an efficient data replication solution. This is the first post of the data lake demo series. Over time, we’ll build a data lake that uses CDC. As a starting point, we’ll discuss the source database and CDC streaming infrastructure in the local environment.


As described in a Red Hat IT topics article, change data capture (CDC) is a proven data integration pattern to track when and what changes occur in data then alert other systems and services that must respond to those changes. Change data capture helps maintain consistency and functionality across all systems that rely on data.

The primary use of CDC is to enable applications to respond almost immediately whenever data in databases change. Specifically its use cases cover microservices integration, data replication with up-to-date data, building time-sensitive analytics dashboards, auditing and compliance, cache invalidation, full-text search and so on. There are a number of approaches for CDC - polling, dual writes and log-based CDC. Among those, log-based CDC has advantages to other approaches.

Both Amazon DMS and Debezium implement log-based CDC. While the former is a managed service, the latter can be deployed to a Kafka cluster as a (source) connector. It uses Apache Kafka as a messaging service to deliver database change notifications to the applicable systems and applications. Note that Kafka Connect is a tool for streaming data between Apache Kafka and other data systems by connectors in a scalable and reliable way. In AWS, we can use Amazon MSK and MSK Connect for building a Debezium based CDC solution.

Data replication to data lakes using CDC can be much more effective if data is stored to a format that supports atomic transactions and consistent updates. Popular choices are Apache Hudi, Apache Iceberg and Delta Lake. Among those, Apache Hudi can be a good option as it is well-integrated with AWS services.

Below shows the architecture of the data lake solution that we will be building in this series of posts.

  1. Employing the transactional outbox pattern, the source database publishes change event records to the CDC event table. The event records are generated by triggers that listen to insert and update events on source tables.
  2. CDC is implemented in a streaming environment and Amazon MSK is used to build the streaming infrastructure. In order to process the real-time CDC event records, a source and sink connectors are set up in Amazon MSK Connect. The Debezium connector for PostgreSQL is used as the source connector and the Lenses S3 connector is used as the sink connector. The sink connector pushes messages to a S3 bucket.
  3. Hudi DeltaStreamer is run on Amazon EMR. As a spark application, it reads files from the S3 bucket and upserts Hudi records to another S3 bucket. The Hudi table is created in the AWS Glue Data Catalog.
  4. The Hudi table is queried in Amazon Athena while the table is registered in the AWS Glue Data Catalog.
  5. Dashboards are created in Amazon Quicksight where the dataset is created using Amazon Athena.

As a starting point, we’ll discuss the source database and streaming infrastructure in the local environment.

Source Database

Data Model

We will use the Northwind database as the source database. It was originally created by Microsoft and used by various tutorials of their database products. It contains the sales data for a fictitious company called Northwind Traders that deals with specialty foods from around the world. As shown in the following entity relationship diagram, it includes a schema for a small business ERP with customers, products, orders, employees and so on. The version that is ported to PostgreSQL is obtained from YugabyteDB sample datasets and the SQL scripts can be found in the project GitHub repository. For local development, a service is created using docker compose, and it’ll be illustrated in the next section.

Outbox Table and Event Generation

It is straightforward to capture changes from multiple tables in a database using Kafka connectors where a separate topic is created for each table. Data ingestion to Hudi, however, can be complicated if messages are stored in multiple Kafka topics. Note that we will use the DeltaStreamer utility, and it maps to a single topic. In order to simplify the data ingestion process, we can employ the transactional outbox pattern. Using this pattern, we can create an outbox table (cdc_events) and upsert a record to it when a new transaction is made. In this way, all database changes can be pushed to a single topic, resulting in one DeltaStreamer process to listen to the change events.

Below shows the table creation statement of the outbox table. It aims to store all details of an order entry in a row. The columns that have the JSONB data type store attributes of other entities. For example, the order_items column includes ordered product information. As multiple products can be purchased, it keeps an array of product ID, unit price, quantity and discount.

 1-- ./data/sql/03_cdc_events.sql
 2CREATE TABLE cdc_events(
 3    order_id        SMALLINT NOT NULL PRIMARY KEY,
 4    customer_id     BPCHAR NOT NULL,
 5    order_date      DATE,
 6    required_date   DATE,
 7    shipped_date    DATE,
 8    order_items     JSONB,
 9    products        JSONB,
10    customer        JSONB,
11    employee        JSONB,
12    shipper         JSONB,
13    shipment        JSONB,
14    updated_at      TIMESTAMPTZ

In order to create event records, triggers are added to the orders and order_details tables. They execute the fn_insert_order_event function after an INSERT or UPDATE action occurs to the respective tables. Note the DELETE action is not considered in the event generation process for simplicity. The trigger function basically collects details of an order entry and attempts to insert a new record to the outbox table. If a record with the same order ID exists, it updates the record instead.

  1-- ./data/sql/03_cdc_events.sql
  2CREATE TRIGGER orders_triggered
  4  ON orders
  6  EXECUTE PROCEDURE fn_insert_order_event();
  8CREATE TRIGGER order_details_triggered
 10  ON order_details
 12  EXECUTE PROCEDURE fn_insert_order_event();
 14CREATE OR REPLACE FUNCTION fn_insert_order_event()
 17    AS
 21        WITH product_details AS (
 22            SELECT p.product_id,
 23                  row_to_json(p.*)::jsonb AS product_details
 24            FROM (
 25                SELECT *
 26                FROM products p
 27                JOIN suppliers s ON p.supplier_id = s.supplier_id
 28                JOIN categories c ON p.category_id = c.category_id
 29            ) AS p
 30        ), order_items AS (
 31            SELECT od.order_id,
 32                  jsonb_agg(row_to_json(od.*)::jsonb - 'order_id') AS order_items,
 33                  jsonb_agg(pd.product_details) AS products
 34            FROM order_details od
 35            JOIN product_details pd ON od.product_id = pd.product_id
 36            WHERE od.order_id = NEW.order_id
 37            GROUP BY od.order_id
 38        ), emps AS (
 39            SELECT employee_id,
 40                  row_to_json(e.*)::jsonb AS details
 41            FROM employees e
 42        ), emp_territories AS (
 43            SELECT et.employee_id,
 44                  jsonb_agg(
 45                    row_to_json(t.*)
 46                  ) AS territories
 47            FROM employee_territories et
 48            JOIN (
 49                SELECT t.territory_id, t.territory_description, t.region_id, r.region_description
 50                FROM territories t
 51                JOIN region r ON t.region_id = r.region_id
 52            ) AS t ON et.territory_id = t.territory_id
 53            GROUP BY et.employee_id
 54        ), emp_details AS (
 55            SELECT e.employee_id,
 56                  e.details || jsonb_build_object('territories', et.territories) AS details
 57            FROM emps AS e
 58            JOIN emp_territories AS et ON e.employee_id = et.employee_id
 59        )
 60            INSERT INTO cdc_events
 61                SELECT o.order_id,
 62                      o.customer_id,
 63                      o.order_date,
 64                      o.required_date,
 65                      o.shipped_date,
 66                      oi.order_items,
 67                      oi.products,
 68                      row_to_json(c.*)::jsonb AS customer,
 69                      ed.details::jsonb AS employee,
 70                      row_to_json(s.*)::jsonb AS shipper,
 71                      jsonb_build_object(
 72                        'freight', o.freight,
 73                        'ship_name', o.ship_name,
 74                        'ship_address', o.ship_address,
 75                        'ship_city', o.ship_city,
 76                        'ship_region', o.ship_region,
 77                        'ship_postal_code', o.ship_postal_code,
 78                        'ship_country', o.ship_country
 79                      ) AS shipment,
 80                      now()
 81                FROM orders o
 82                LEFT JOIN order_items oi ON o.order_id = oi.order_id
 83                JOIN customers c ON o.customer_id = c.customer_id
 84                JOIN emp_details ed ON o.employee_id = ed.employee_id
 85                JOIN shippers s ON o.ship_via = s.shipper_id
 86                WHERE o.order_id = NEW.order_id
 87            ON CONFLICT (order_id)
 88            DO UPDATE
 89                SET order_id        = excluded.order_id,
 90                    customer_id     = excluded.customer_id,
 91                    order_date      = excluded.order_date,
 92                    required_date   = excluded.required_date,
 93                    shipped_date    = excluded.shipped_date,
 94                    order_items     = excluded.order_items,
 95                    products        = excluded.products,
 96                    customer        = excluded.customer,
 97                    shipper         = excluded.shipper,
 98                    shipment        = excluded.shipment,
 99                    updated_at      = excluded.updated_at;
100    END IF;

Create Initial Event Records

In order to create event records for existing order entries, a stored procedure is created - usp_init_order_events. It is quite similar to the trigger function and can be checked in the project GitHub repository. The procedure is called at database initialization and a total of 829 event records are created by that.

1-- ./data/sql/03_cdc_events.sql
2CALL usp_init_order_events();

Below shows a simplified event record, converted into JSON. For the order with ID 10248, 3 products are ordered by a customer whose ID is VINET.

 2  "order_id": 10248,
 3  "customer_id": "VINET",
 4  "order_date": "1996-07-04",
 5  "required_date": "1996-08-01",
 6  "shipped_date": "1996-07-16",
 7  "order_items": [
 8    { "discount": 0, "quantity": 12, "product_id": 11, "unit_price": 14 },
 9    { "discount": 0, "quantity": 10, "product_id": 42, "unit_price": 9.8 },
10    { "discount": 0, "quantity": 5, "product_id": 72, "unit_price": 34.8 }
11  ],
12  "products": [
13    { "product_id": 11, "product_name": "Queso Cabrales" },
14    { "product_id": 42, "product_name": "Singaporean Hokkien Fried Mee" },
15    { "product_id": 72, "product_name": "Mozzarella di Giovanni" }
16  ],
17  "customer": {
18    "customer_id": "VINET",
19    "company_name": "Vins et alcools Chevalier"
20  },
21  "employee": {
22    "title": "Sales Manager",
23    "last_name": "Buchanan",
24    "employee_id": 5
25  },
26  "shipper": {
27    "company_name": "Federal Shipping"
28  },
29  "shipment": {
30    "freight": 32.38,
31    "ship_name": "Vins et alcools Chevalier"
32  },
33  "updated_at": "2021-11-27T20:30:13.644579+11:00"

Create Publication

As discussed further in the next section, we’ll be using the native pgoutput logical replication stream support. Debezium, Kafka source connector, automatically creates a publication that contains all tables if it doesn’t exist. It can cause trouble to update a record to a table that doesn’t have the primary key or replica identity. In order to handle such an issue, a publication that contains only the outbox table is created. This publication will be used when configuring the source connector.

1-- ./data/sql/03_cdc_events.sql
2CREATE PUBLICATION cdc_publication
3    FOR TABLE cdc_events;

CDC Development

Docker Compose

The Confluent platform can be handy for local development, although we’ll be deploying the solution using Amazon MSK and MSK Connect. The quick start guide provides a docker compose file that includes its various components in separate services. It also contains the control center, a graphical user interface, which helps check brokers, topics, messages and connectors easily.

Additionally, we need a PostgreSQL instance for the Northwind database and a service named postgres is added. The database is initialised by a set of SQL scripts. They are executed by volume-mapping to the initialisation folder - the scripts can be found in the project GitHub repository. Also, the Kafka Connect instance, running in the connect service, needs an update to include the source and sink connectors. It’ll be illustrated further below.

Below shows a cut-down version of the docker compose file that we use for local development. The complete file can be found in the project GitHub repository.

 1# ./docker-compose.yml
 2version: "2"
 4  postgres:
 5    image: debezium/postgres:13
 6    ...
 7    ports:
 8      - 5432:5432
 9    volumes:
10      - ./data/sql:/docker-entrypoint-initdb.d
11    environment:
12      - POSTGRES_DB=devdb
13      - POSTGRES_USER=devuser
14      - POSTGRES_PASSWORD=password
15  zookeeper:
16    image: confluentinc/cp-zookeeper:6.2.1
17    ...
18    ports:
19      - "2181:2181"
20    environment:
21      ...
22  broker:
23    image: confluentinc/cp-server:6.2.1
24    ...
25    depends_on:
26      - zookeeper
27    ports:
28      - "9092:9092"
29      - "9101:9101"
30    environment:
31      ...
32      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
33      ...
34  schema-registry:
35    image: confluentinc/cp-schema-registry:6.2.1
36    ...
37    depends_on:
38      ...
39    ports:
40      - "8081:8081"
41    environment:
42      ...
43  connect:
44    build: .connector/local/cp-server-connect-datagen
45    ...
46    depends_on:
47      ...
48    ports:
49      - "8083:8083"
50    volumes:
51      - ${HOME}/.aws:/home/appuser/.aws
52    environment:
53      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
55      CONNECT_REST_PORT: 8083
56      ...
57      # include /usr/local/share/kafka/plugins for community connectors
58      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins"
59      ...
60  control-center:
61    image: confluentinc/cp-enterprise-control-center:6.2.1
62    ...
63    depends_on:
64      ...
65    ports:
66      - "9021:9021"
67    environment:
70      ...
71      PORT: 9021
72  rest-proxy:
73    image: confluentinc/cp-kafka-rest:6.2.1
74    ...
75    depends_on:
76      ...
77    ports:
78      - 8082:8082
79    environment:
80      ...

Install Connectors

We use the Debezium connector for PostgreSQL as the source connector and Lenses S3 Connector as the sink connector. The source connector is installed via the confluent hub client while the sink connector is added as a community connector. Note that the environment variable of CONNECT_PLUGIN_PATH is updated to include the kafka plugin folder (/usr/local/share/kafka/plugins).

 1# .connector/local/cp-server-connect-datagen/Dockerfile
 2FROM cnfldemos/cp-server-connect-datagen:0.5.0-6.2.1
 4# install debezium postgresql connector from confluent hub
 5RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:1.7.1
 7# install lenses S3 connector as a community connector -
 8USER root
 9RUN mkdir -p /usr/local/share/kafka/plugins/kafka-connect-aws-s3 && \
10  curl -SsL \
11    | tar -C /usr/local/share/kafka/plugins/kafka-connect-aws-s3 --warning=no-unknown-keyword -xzf -
13# update connect plugin path
14ENV CONNECT_PLUGIN_PATH=$CONNECT_PLUGIN_PATH,/usr/local/share/kafka/plugins
15USER appuser

Start Services

After starting the docker compose services, we can check a local Kafka cluster in the control center via http://localhost:9021. We can go to the cluster overview page by clicking the cluster card item.

We can check an overview of the cluster in the page. For example, it shows the cluster has 1 broker and there is no running connector. On the left side, there are menus for individual components. Among those, Topics and Connect will be our main interest in this post.

Create Connectors

Source Connector

Debezium’s PostgreSQL connector captures row-level changes in the schemas of a PostgreSQL database. PostgreSQL versions 9.6, 10, 11, 12 and 13 are supported. The first time it connects to a PostgreSQL server or cluster, the connector takes a consistent snapshot of all schemas. After that snapshot is complete, the connector continuously captures row-level changes that insert, update, and delete database content and that were committed to a PostgreSQL database. The connector generates data change event records and streams them to Kafka topics. For each table, the default behavior is that the connector streams all generated events to a separate Kafka topic for that table. Applications and services consume data change event records from that topic.

The connector has a number of connector properties including name, connector class, database connection details, key/value converter and so on - the full list of properties can be found in this page. The properties that need explanation are listed below.

  • - Using the logical decoding feature, an output plug-in enables clients to consume changes to the transaction log in a user-friendly manner. Debezium supports decoderbufs, wal2json and pgoutput plug-ins. Both wal2json and pgoutput are available in Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL. decoderbufs requires a separate installation, and it is excluded from the option. Among the 2 supported plug-ins, pgoutput is selected because it is the standard logical decoding output plug-in in PostgreSQL 10+ and has better performance for large transactions.
  • - With the pgoutput plug-in, the Debezium connector creates a publication (if not exists) and sets publication.autocreate.mode to all_tables. It can cause an issue to update a record to a table that doesn’t have the primary key or replica identity. We can set the value to filtered where the connector adjusts the applicable tables by other property values. Alternatively we can create a publication on our own and add the name to property. I find creating a publication explicitly is easier to maintain. Note a publication alone is not sufficient to handle the issue. All affected tables by the publication should have the primary key or replica identity. In our example, the _orders _and _order_details _tables should meet the condition. In short, creating an explicit publication can prevent the event generation process from interrupting other processes by limiting the scope of CDC event generation.
  • key.converter/value.converter - Although Avro serialization is recommended, JSON is a format that can be generated without schema registry and can be read by DeltaStreamer.
  • transforms - A Debezium event data has a complex structure that provides a wealth of information. It can be quite difficult to process such a structure using DeltaStreamer. Debezium’s event flattening single message transformation (SMT) is configured to flatten the output payload.

Note once the connector is deployed, the CDC event records will be published to demo.datalake.cdc_events topic.

 1// ./connector/local/source-debezium.json
 3  "name": "orders-source",
 4  "config": {
 5    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
 6    "tasks.max": "1",
 7    "": "pgoutput",
 8    "": "cdc_publication",
 9    "database.hostname": "postgres",
10    "database.port": "5432",
11    "database.user": "devuser",
12    "database.password": "password",
13    "database.dbname": "devdb",
14    "": "demo",
15    "schema.include": "datalake",
16    "table.include.list": "datalake.cdc_events",
17    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
18    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
19    "transforms": "unwrap",
20    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
21    "transforms.unwrap.drop.tombstones": "false",
22    "transforms.unwrap.delete.handling.mode": "rewrite",
23    "transforms.unwrap.add.fields": "op,db,table,schema,lsn,source.ts_ms"
24  }

The source connector is created using the API and its status can be checked as shown below.

 1## create debezium source connector
 2curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" \
 3  http://localhost:8083/connectors/ -d @connector/local/source-debezium.json
 5## check connector status
 6curl http://localhost:8083/connectors/orders-source/status
 9#  "name": "orders-source",
10#  "connector": { "state": "RUNNING", "worker_id": "connect:8083" },
11#  "tasks": [{ "id": 0, "state": "RUNNING", "worker_id": "connect:8083" }],
12#  "type": "source"

Sink Connector

Lenses S3 Connector is a Kafka Connect sink connector for writing records from Kafka to AWS S3 Buckets. It extends the standard connect config adding a parameter for a SQL command (Lenses Kafka Connect Query Language or “KCQL”). This defines how to map data from the source (in this case Kafka) to the target (S3). Importantly, it also includes how data should be partitioned into S3, the bucket names and the serialization format (support includes JSON, Avro, Parquet, Text, CSV and binary).

I find the Lenses S3 connector is more straightforward to configure than the Confluent S3 sink connector for its SQL-like syntax. The KCQL configuration indicates that object files are set to be

  • moved from a Kafka topic (demo.datalake.cdc_events) to an S3 bucket (data-lake-demo-cevo) with object prefix of _cdc-events-local,
  • partitioned by customer_id and order_id e.g. customer_id=<customer-id>/order_id=<order-id>,
  • stored as the JSON format and,
  • flushed every 60 seconds or when there are 50 records.
 1// ./connector/local/sink-s3.json
 3  "name": "orders-sink",
 4  "config": {
 5    "connector.class": "",
 6    "tasks.max": "1",
 7    "connect.s3.kcql": "INSERT INTO data-lake-demo-cevo:cdc-events-local SELECT * FROM demo.datalake.cdc_events PARTITIONBY customer_id,order_id STOREAS `json` WITH_FLUSH_INTERVAL = 60 WITH_FLUSH_COUNT = 50",
 8    "aws.region": "ap-southeast-2",
 9    "aws.custom.endpoint": "",
10    "topics": "demo.datalake.cdc_events",
11    "key.converter.schemas.enable": "false",
12    "schema.enable": "false",
13    "errors.log.enable": "true",
14    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
15    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
16  }

The sink connector is created using the API and its status can be checked as shown below.

 1### create s3 sink connector
 2curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" \
 3  http://localhost:8083/connectors/ -d @connector/local/sink-s3.json
 5## check connector status
 6curl http://localhost:8083/connectors/orders-sink/status
 9#  "name": "orders-sink",
10#  "connector": { "state": "RUNNING", "worker_id": "connect:8083" },
11#  "tasks": [{ "id": 0, "state": "RUNNING", "worker_id": "connect:8083" }],
12#  "type": "sink"

We can also check the details of the connectors from the control center.

In the Topics menu, we are able to see that the demo.datalake.cdc_events topic is created by the Debezium connector.

We can check messages of a topic by clicking the topic name. After adding an offset value (e.g. 0) to the input element, we are able to see messages of the topic. We can check message fields on the left-hand side or download a message in the JSON or CSV format.

Check Event Output Files

We can check the output files that are processed by the sink connector in S3. Below shows an example record where _customer_id _is RATTC and _order_id _is 11077. As configured, the objects are prefixed by cdc-events-local and further partitioned by _customer_id _and order_id. The naming convention of output files is <topic-name>(partition_offset).ext.

Update Event Example

The above order record has a NULL shipped_date value. When we update it using the following SQL statement, we should be able to see a new output file with the updated value.

2    UPDATE orders
3    SET shipped_date = '1998-06-15'::date
4    WHERE order_id = 11077;

In S3, we are able to see that a new output file is stored. In the new output file, the _shipped_date _value is updated to 10392. Note that the Debezium connector converts the DATE type to the INT32 type, which represents the number of days since the epoch.

Insert Event Example

When a new order is created, it’ll insert a record to the _orders _table as well as one or more order items to the _order_details _table. Therefore, we expect multiple event records will be created when a new order is created. We can check it by inserting an order and related order details items.

2    INSERT INTO orders VALUES (11075, 'RICSU', 8, '1998-05-06', '1998-06-03', NULL, 2, 6.19000006, 'Richter Supermarkt', 'Starenweg 5', 'Genève', NULL, '1204', 'Switzerland');
3    INSERT INTO order_details VALUES (11075, 2, 19, 10, 0.150000006);
4    INSERT INTO order_details VALUES (11075, 46, 12, 30, 0.150000006);
5    INSERT INTO order_details VALUES (11075, 76, 18, 2, 0.150000006);

We can see the output file includes 4 JSON objects where the first object has NULL _order_items _and _products _value. We can also see that those values are expanded gradually in subsequent event records.


We discussed a data lake solution where data ingestion is performed using change data capture (CDC) and the output files are upserted to a Hudi table. Being registered to Glue Data Catalog, it can be used for ad-hoc queries and report/dashboard creation. For the solution, the Northwind database is used as the source database and, following the transactional outbox pattern, order-related changes are upserted to an outbox table by triggers. The data ingestion is developed in the local Confluent platform where the Debezium for PostgreSQL is used as the source connector and the Lenses S3 sink connector is used as the sink connector. We confirmed order creation and update events are captured as expected. In the next post, we’ll build the data ingestion part of the solution with Amazon MSK and MSK Connect.