In Part 1, we reviewed Kafka connectors focusing on AWS services integration. Among the available connectors, the suite of Apache Camel Kafka connectors and the Kinesis Kafka connector from the AWS Labs can be effective for building data ingestion pipelines on AWS. In this post, I will illustrate how to develop the Camel DynamoDB sink connector using Docker. Fake order data will be generated using the MSK Data Generator source connector, and the sink connector will be configured to consume the topic messages to ingest them into a DynamoDB table.

Kafka Cluster

We will create a Kafka cluster with 3 brokers and 1 Zookeeper node using the bitnami/kafka image. Kafka 2.8.1 is used as it is the recommended Kafka version by Amazon MSK. The following Docker Compose file is used to create the Kafka cluster, and the source can also be found in the GitHub repository of this post. The resources created by the compose file are illustrated below.

  • services
    • zookeeper
      • A Zookeeper node is created with minimal configuration. It allows anonymous login.
    • kafka-[id]
      • Each broker has a unique ID (KAFKA_CFG_BROKER_ID) and shares the same Zookeeper connect parameter (KAFKA_CFG_ZOOKEEPER_CONNECT). These are required to connect to the Zookeeper node.
      • Each has two listeners - INTERNAL and EXTERNAL. The former is accessed on port 9092, and it is used within the same Docker network. The latter is mapped from port 29092 to 29094, and it can be used to connect from outside the network.
      • Each can be accessed without authentication (ALLOW_PLAINTEXT_LISTENER).
      • The number of partitions (KAFKA_CFG_NUM_PARTITIONS) and default replica factor (KAFKA_CFG_DEFAULT_REPLICATION_FACTOR) are set to 3 respectively.
  • networks
    • A network named kafka-network is created and used by all services. Having a custom network can be beneficial when services are launched by multiple Docker Compose files. This custom network can be referred to by services in other compose files.
  • volumes
    • Each service has its own volume that will be mapped to the container’s data folder. We can check the contents of the folder in the Docker volume path. More importantly data is preserved in the Docker volume unless it is deleted so that we don’t have to recreate data every time the Kafka cluster gets started.
  1# kafka-connect-for-aws/part-02/compose-kafka.yml
  2version: "3.5"
  3
  4services:
  5  zookeeper:
  6    image: bitnami/zookeeper:3.5
  7    container_name: zookeeper
  8    ports:
  9      - "2181"
 10    networks:
 11      - kafkanet
 12    environment:
 13      - ALLOW_ANONYMOUS_LOGIN=yes
 14    volumes:
 15      - zookeeper_data:/bitnami/zookeeper
 16  kafka-0:
 17    image: bitnami/kafka:2.8.1
 18    container_name: kafka-0
 19    expose:
 20      - 9092
 21    ports:
 22      - "29092:29092"
 23    networks:
 24      - kafkanet
 25    environment:
 26      - ALLOW_PLAINTEXT_LISTENER=yes
 27      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 28      - KAFKA_CFG_BROKER_ID=0
 29      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
 30      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29092
 31      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,EXTERNAL://localhost:29092
 32      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
 33      - KAFKA_CFG_NUM_PARTITIONS=3
 34      - KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=3
 35    volumes:
 36      - kafka_0_data:/bitnami/kafka
 37    depends_on:
 38      - zookeeper
 39  kafka-1:
 40    image: bitnami/kafka:2.8.1
 41    container_name: kafka-1
 42    expose:
 43      - 9092
 44    ports:
 45      - "29093:29093"
 46    networks:
 47      - kafkanet
 48    environment:
 49      - ALLOW_PLAINTEXT_LISTENER=yes
 50      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 51      - KAFKA_CFG_BROKER_ID=1
 52      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
 53      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29093
 54      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-1:9092,EXTERNAL://localhost:29093
 55      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
 56      - KAFKA_CFG_NUM_PARTITIONS=3
 57      - KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=3
 58    volumes:
 59      - kafka_1_data:/bitnami/kafka
 60    depends_on:
 61      - zookeeper
 62  kafka-2:
 63    image: bitnami/kafka:2.8.1
 64    container_name: kafka-2
 65    expose:
 66      - 9092
 67    ports:
 68      - "29094:29094"
 69    networks:
 70      - kafkanet
 71    environment:
 72      - ALLOW_PLAINTEXT_LISTENER=yes
 73      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
 74      - KAFKA_CFG_BROKER_ID=2
 75      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
 76      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29094
 77      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-2:9092,EXTERNAL://localhost:29094
 78      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
 79      - KAFKA_CFG_NUM_PARTITIONS=3
 80      - KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=3
 81    volumes:
 82      - kafka_2_data:/bitnami/kafka
 83    depends_on:
 84      - zookeeper
 85
 86networks:
 87  kafkanet:
 88    name: kafka-network
 89
 90volumes:
 91  zookeeper_data:
 92    driver: local
 93    name: zookeeper_data
 94  kafka_0_data:
 95    driver: local
 96    name: kafka_0_data
 97  kafka_1_data:
 98    driver: local
 99    name: kafka_1_data
100  kafka_2_data:
101    driver: local
102    name: kafka_2_data

Kafka Connect

We can use the same Docker image because Kafka Connect is included in the Kafka distribution. The Kafka Connect server runs as a separate docker compose service, and its key configurations are listed below.

  • We run it as the distributed mode, and it can be started by executing connect-distributed.sh on the Docker command.
    • The startup script requires the properties file (connect-distributed.properties). It includes configurations such as Kafka broker server addresses - see below for details.
  • The Connect server is accessible on port 8083, and we can manage connectors via a REST API as demonstrated below.
  • The properties file and connector sources are volume-mapped.
  • AWS credentials are added to environment variables as the sink connector requires permission to write data into DynamoDB.
 1# kafka-connect-for-aws/part-02/compose-connect.yml
 2version: "3.5"
 3
 4services:
 5  kafka-connect:
 6    image: bitnami/kafka:2.8.1
 7    container_name: connect
 8    command: >
 9      /opt/bitnami/kafka/bin/connect-distributed.sh
10      /opt/bitnami/kafka/config/connect-distributed.properties      
11    ports:
12      - "8083:8083"
13    networks:
14      - kafkanet
15    environment:
16      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
17      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
18      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
19    volumes:
20      - "./configs/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties"
21      - "./connectors/msk-data-generator.jar:/opt/connectors/datagen/msk-data-generator.jar"
22      - "./connectors/camel-aws-ddb-sink-kafka-connector:/opt/connectors/camel-aws-ddb-sink-kafka-connector"
23
24networks:
25  kafkanet:
26    external: true
27    name: kafka-network

Connect Properties File

The properties file includes configurations of the Connect server. Below shows key config values.

  • Bootstrap Server
    • I changed the Kafka bootstrap server addresses. As it shares the same Docker network, we can take the service names (e.g. kafka-0) on port 9092.
  • Cluster group id
    • In distributed mode, multiple worker processes use the same group.id, and they automatically coordinate to schedule execution of connectors and tasks across all available workers.
  • Converter-related properties
    • Converters are necessary to have a Kafka Connect deployment support a particular data format when writing to or reading from Kafka.
    • By default, org.apache.kafka.connect.json.JsonConverter is set for both the key and value converters and schemas are enabled for both of them.
    • As shown later, these properties can be overridden when creating a connector.
  • Topics for offsets, configs, status
    • Several topics are created to manage connectors by multiple worker processes.
  • Plugin path
    • Paths that contains plugins (connectors, converters, transformations) can be set to a list of filesystem paths separated by commas (,)
    • /opt/connectors is added and connector sources will be volume-mapped to it.
 1## kafka-connect-for-aws/part-02/connect-distributed.properties
 2# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
 3bootstrap.servers=kafka-0:9092,kafka-1:9092,kafka-2:9092
 4
 5# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
 6group.id=connect-cluster
 7
 8# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
 9# need to configure these based on the format they want their data in when loaded from or stored into Kafka
10key.converter=org.apache.kafka.connect.json.JsonConverter
11value.converter=org.apache.kafka.connect.json.JsonConverter
12# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
13# it to
14key.converter.schemas.enable=true
15value.converter.schemas.enable=true
16
17# Topic to use for storing offsets.
18offset.storage.topic=connect-offsets
19offset.storage.replication.factor=1
20#offset.storage.partitions=25
21
22# Topic to use for storing connector and task configurations.
23config.storage.topic=connect-configs
24config.storage.replication.factor=1
25
26# Topic to use for storing statuses. 
27status.storage.topic=connect-status
28status.storage.replication.factor=1
29#status.storage.partitions=5
30
31...
32
33# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
34# (connectors, converters, transformations).
35plugin.path=/opt/connectors

Download Connectors

The connector sources need to be downloaded into the ./connectors path so that they can be volume-mapped to the container’s plugin path (/opt/connectors). The MSK Data Generator is a single Jar file, and it can be kept as it is. On the other hand, the Camel DynamoDB sink connector is an archive file, and it should be decompressed. Note a separate zip file is made as well, and it will be used to create a custom plugin of MSK Connect in a later post. The following script downloads them into the host path.

 1# kafka-connect-for-aws/part-02/download.sh
 2#!/usr/bin/env bash
 3SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
 4
 5SRC_PATH=${SCRIPT_DIR}/connectors
 6rm -rf ${SRC_PATH} && mkdir ${SRC_PATH}
 7
 8## MSK Data Generator Souce Connector
 9echo "downloading msk data generator..."
10DOWNLOAD_URL=https://github.com/awslabs/amazon-msk-data-generator/releases/download/v0.4.0/msk-data-generator-0.4-jar-with-dependencies.jar
11
12curl -L -o ${SRC_PATH}/msk-data-generator.jar ${DOWNLOAD_URL}
13
14## Download camel dynamodb sink connector
15echo "download camel dynamodb sink connector..."
16DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ddb-sink-kafka-connector/3.20.3/camel-aws-ddb-sink-kafka-connector-3.20.3-package.tar.gz
17
18# decompress and zip contents to create custom plugin of msk connect later
19curl -o ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz ${DOWNLOAD_URL} \
20  && tar -xvzf ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz -C ${SRC_PATH} \
21  && cd ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector \
22  && zip -r camel-aws-ddb-sink-kafka-connector.zip . \
23  && mv camel-aws-ddb-sink-kafka-connector.zip ${SRC_PATH} \
24  && rm ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz

Below shows the folder structure after the connectors are downloaded successfully.

 1connectors/
 2├── camel-aws-ddb-sink-kafka-connector
 3...
 4│   ├── camel-api-3.20.3.jar
 5│   ├── camel-aws-ddb-sink-kafka-connector-3.20.3.jar **
 6│   ├── camel-aws2-ddb-3.20.3.jar
 7...
 8├── camel-aws-ddb-sink-kafka-connector.zip **
 9...
10└── msk-data-generator.jar **
11
123 directories, 128 files

Kafka Management App

A Kafka management app can be a good companion for development as it helps monitor and manage resources on an easy-to-use user interface. We’ll use kafka-ui in this post. It provides a docker image, and we can link one or more Kafka clusters and related resources to it. In the following compose file, we added connection details of the Kafka cluster and Kafka Connect server.

 1# kafka-connect-for-aws/part-02/compose-ui.yml
 2version: "3.5"
 3
 4services:
 5  kafka-ui:
 6    image: provectuslabs/kafka-ui:master
 7    container_name: kafka-ui
 8    ports:
 9      - "8080:8080"
10    networks:
11      - kafkanet
12    environment:
13      KAFKA_CLUSTERS_0_NAME: local
14      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
15      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
16      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: local
17      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
18
19networks:
20  kafkanet:
21    external: true
22    name: kafka-network

Start Services

There are 3 docker compose files for the Kafka cluster, Kafka Connect and management application. We can run the whole services by starting them in order as illustrated below.

1$ cd kafka-connect-for-aws/part-02
2# download connectors
3$ ./download.sh
4# starts 3 node kafka cluster
5$ docker-compose -f compose-kafka.yml up -d
6# starts kafka connect server in distributed mode
7$ docker-compose -f compose-connect.yml up -d
8# starts kafka-ui
9$ docker-compose -f compose-ui.yml up -d

Data Ingestion to Kafka Topic

Source Connector Creation

As mentioned earlier, Kafka Connect provides a REST API that manages connectors, and we can create a connector programmatically using it. The REST endpoint requires a JSON payload that includes connector configurations.

1$ cd kafka-connect-for-aws/part-02
2$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
3  http://localhost:8083/connectors/ -d @configs/source.json

The connector class (connector.class) is required for any connector and I set it for the MSK Data Generator. Also, a single worker is allocated to the connector (tasks.max). As mentioned earlier, the converter-related properties are overridden. Specifically, the key converter is set to the string converter as the key of the topic is set to be primitive values (genkp). Also, schemas are not enabled for both the key and value.

Those properties in the middle are specific to the source connector. Basically it sends messages to a topic named order. The key is marked as to-replace as it will be replaced with the order_id attribute of the value - see below. The value has order_id, product_id, quantity, customer_id and customer_name attributes, and they are generated by the Java faker library.

It can be easier to manage messages if the same order ID is shared with the key and value. We can achieve it using single message transforms (SMTs). Specifically I used two transforms - ValueToKey and ExtractField to achieve it. As the name suggests, the former copies the order_id value into the key. The latter is used additionally because the key is set to have primitive string values. Finally, the last transform (Cast) is to change the quantity value into integer.

 1// kafka-connect-for-aws/part-02/configs/source.json
 2{
 3  "name": "order-source",
 4  "config": {
 5    "connector.class": "com.amazonaws.mskdatagen.GeneratorSourceConnector",
 6    "tasks.max": "1",
 7    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 8    "key.converter.schemas.enable": false,
 9    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
10    "value.converter.schemas.enable": false,
11
12    "genkp.order.with": "to-replace",
13    "genv.order.order_id.with": "#{Internet.uuid}",
14    "genv.order.product_id.with": "#{Code.isbn10}",
15    "genv.order.quantity.with": "#{number.number_between '1','5'}",
16    "genv.order.customer_id.with": "#{number.number_between '100','199'}",
17    "genv.order.customer_name.with": "#{Name.full_name}",
18    "global.throttle.ms": "500",
19    "global.history.records.max": "1000",
20
21    "transforms": "copyIdToKey,extractKeyFromStruct,cast",
22    "transforms.copyIdToKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
23    "transforms.copyIdToKey.fields": "order_id",
24    "transforms.extractKeyFromStruct.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
25    "transforms.extractKeyFromStruct.field": "order_id",
26    "transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
27    "transforms.cast.spec": "quantity:int8"
28  }
29}

Once created successfully, we can check the connector status as shown below.

1$ curl http://localhost:8083/connectors/order-source/status
 1{
 2	"name": "order-source",
 3	"connector": {
 4		"state": "RUNNING",
 5		"worker_id": "172.19.0.6:8083"
 6	},
 7	"tasks": [
 8		{
 9			"id": 0,
10			"state": "RUNNING",
11			"worker_id": "172.19.0.6:8083"
12		}
13	],
14	"type": "source"
15}

As we’ve added the connector URL, the Kafka Connect menu appears on kafka-ui. We can check the details of the connector on the app as well.

Kafka Topics

As configured, the source connector ingests messages to the order topic.

We can browse individual messages in the Messages tab of the topic.

Data Ingestion to DynamoDB

Table Creation

The destination table is named orders, and it has the primary key where order_id and ordered_at are the hash and range key respectively. It also has a global secondary index where customer_id and ordered_at constitute the primary key. Note that ordered_at is not generated by the source connector as the Java faker library doesn’t have a method to generate a current timestamp. As illustrated below it’ll be created by the sink connector using SMTs. The table can be created using the AWS CLI as shown below.

1aws dynamodb create-table \
2  --cli-input-json file://configs/ddb.json
 1// kafka-connect-for-aws/part-02/configs/ddb.json
 2{
 3  "TableName": "orders",
 4  "KeySchema": [
 5    { "AttributeName": "order_id", "KeyType": "HASH" },
 6    { "AttributeName": "ordered_at", "KeyType": "RANGE" }
 7  ],
 8  "AttributeDefinitions": [
 9    { "AttributeName": "order_id", "AttributeType": "S" },
10    { "AttributeName": "customer_id", "AttributeType": "S" },
11    { "AttributeName": "ordered_at", "AttributeType": "S" }
12  ],
13  "ProvisionedThroughput": {
14    "ReadCapacityUnits": 1,
15    "WriteCapacityUnits": 1
16  },
17  "GlobalSecondaryIndexes": [
18    {
19      "IndexName": "customer",
20      "KeySchema": [
21        { "AttributeName": "customer_id", "KeyType": "HASH" },
22        { "AttributeName": "ordered_at", "KeyType": "RANGE" }
23      ],
24      "Projection": { "ProjectionType": "ALL" },
25      "ProvisionedThroughput": {
26        "ReadCapacityUnits": 1,
27        "WriteCapacityUnits": 1
28      }
29    }
30  ]
31}

Sink Connector Creation

Similar to the source connector, we can create the sink connector using the REST API.

1$ cd kafka-connect-for-aws/part-02
2$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
3  http://localhost:8083/connectors/ -d @configs/sink.json

The connector is configured to write messages from the order topic into the DynamoDB table created earlier. It requires to specify the table name, AWS region, operation, write capacity and whether to use the default credential provider - see the documentation for details. Note that, if you don’t use the default credential provider, you have to specify the access key id and secret access key. Note further that, although the current LTS version is v3.18.2, the default credential provider option didn’t work for me, and I was recommended to use v3.20.3 instead. Finally, the camel.sink.unmarshal option is to convert data from the internal java.util.HashMap type into the required java.io.InputStream type. Without this configuration, the connector fails with org.apache.camel.NoTypeConversionAvailableException error.

Although the destination table has ordered_at as the range key, it is not created by the source connector because the Java faker library doesn’t have a method to generate a current timestamp. Therefore, it is created by the sink connector using two SMTs - InsertField and TimestampConverter. Specifically they add a timestamp value to the order_at attribute, format the value as yyyy-MM-dd HH:mm:ss:SSS, and convert its type into string.

 1// kafka-connect-for-aws/part-02/configs/sink.json
 2{
 3  "name": "order-sink",
 4  "config": {
 5    "connector.class": "org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
 6    "tasks.max": "1",
 7    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 8    "key.converter.schemas.enable": false,
 9    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
10    "value.converter.schemas.enable": false,
11    "topics": "order",
12
13    "camel.kamelet.aws-ddb-sink.table": "orders",
14    "camel.kamelet.aws-ddb-sink.region": "ap-southeast-2",
15    "camel.kamelet.aws-ddb-sink.operation": "PutItem",
16    "camel.kamelet.aws-ddb-sink.writeCapacity": 1,
17    "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider": true,
18    "camel.sink.unmarshal": "jackson",
19
20    "transforms": "insertTS,formatTS",
21    "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
22    "transforms.insertTS.timestamp.field": "ordered_at",
23    "transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
24    "transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss:SSS",
25    "transforms.formatTS.field": "ordered_at",
26    "transforms.formatTS.target.type": "string"
27  }
28}

Below shows the sink connector details on kafka-ui.

DynamoDB Destination

We can check the ingested records on the DynamoDB table items view. Below shows a list of scanned records. As expected, it has the order_id, ordered_at and other attributes.

We can also obtain an individual Json record by clicking an order_id value as shown below.

Summary

The suite of Apache Camel Kafka connectors and the Kinesis Kafka connector from the AWS Labs can be effective for building data ingestion pipelines that integrate AWS services. In this post, I illustrated how to develop the Camel DynamoDB sink connector using Docker. Fake order data was generated using the MSK Data Generator source connector, and the sink connector was configured to consume the topic messages to ingest them into a DynamoDB table.