According to the documentation of Apache Kafka, Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect supports two types of connectors - source and sink. Source connectors are used to ingest messages from external systems into Kafka topics while messages are ingested into external systems form Kafka topics with sink connectors. In this post, I will illustrate how to set up a data ingestion pipeline using Kafka connectors. Fake customer and order data will be ingested into the corresponding topics using the MSK Data Generator source connector. The topic messages will then be saved into a S3 bucket using the Confluent S3 sink connector.

Kafka Connect Setup

We can use the same Docker image because Kafka Connect is included in the Kafka distribution. The Kafka Connect server runs as a separate docker compose service, and its key configurations are listed below.

  • We’ll run it as the distributed mode, and it can be started by executing connect-distributed.sh on the Docker command.
    • The startup script requires the properties file (connect-distributed.properties). It includes configurations such as Kafka broker server addresses - see below for details.
  • The Connect server is accessible on port 8083, and we can manage connectors via a REST API as demonstrated below.
  • The properties file and connector sources are volume-mapped.
  • AWS credentials are added to environment variables as the sink connector requires permission to write data into S3.

The source can be found in the GitHub repository of this post.

 1# /kafka-dev-with-docker/part-03/compose-connect.yml
 2version: "3.5"
 3
 4services:
 5  kafka-connect:
 6    image: bitnami/kafka:2.8.1
 7    container_name: connect
 8    command: >
 9      /opt/bitnami/kafka/bin/connect-distributed.sh
10      /opt/bitnami/kafka/config/connect-distributed.properties      
11    ports:
12      - "8083:8083"
13    networks:
14      - kafkanet
15    environment:
16      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
17      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
18      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
19    volumes:
20      - "./configs/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties"
21      - "./connectors/confluent-s3/lib:/opt/connectors/confluent-s3"
22      - "./connectors/msk-datagen:/opt/connectors/msk-datagen"
23
24networks:
25  kafkanet:
26    external: true
27    name: kafka-network

Connect Properties File

The properties file includes configurations of the Connect server. Below shows key config values.

  • Bootstrap Server
    • I changed the Kafka bootstrap server addresses. As it shares the same Docker network, we can take the service names (e.g. kafka-0) on port 9092.
  • Cluster group id
    • In distributed mode, multiple worker processes use the same group.id, and they automatically coordinate to schedule execution of connectors and tasks across all available workers.
  • Converter-related properties
    • Converters are necessary to have a Kafka Connect deployment support a particular data format when writing to or reading from Kafka.
    • By default, org.apache.kafka.connect.json.JsonConverter is set for both the key and value converters and schemas are enabled for both of them.
    • As shown later, these properties can be overridden when creating a connector.
  • Topics for offsets, configs, status
    • Several topics are created to manage connectors by multiple worker processes.
  • Plugin path
    • Paths that contains plugins (connectors, converters, transformations) can be set to a list of filesystem paths separated by commas (,)
    • /opt/connectors is added and connector sources will be volume-mapped to it.
 1# kafka-dev-with-docker/part-03/configs/connect-distributed.properties
 2
 3# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
 4bootstrap.servers=kafka-0:9092,kafka-1:9092,kafka-2:9092
 5
 6# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
 7group.id=connect-cluster
 8
 9# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
10# need to configure these based on the format they want their data in when loaded from or stored into Kafka
11key.converter=org.apache.kafka.connect.json.JsonConverter
12value.converter=org.apache.kafka.connect.json.JsonConverter
13# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
14# it to
15key.converter.schemas.enable=true
16value.converter.schemas.enable=true
17
18# Topic to use for storing offsets.
19offset.storage.topic=connect-offsets
20offset.storage.replication.factor=1
21#offset.storage.partitions=25
22
23# Topic to use for storing connector and task configurations.
24config.storage.topic=connect-configs
25config.storage.replication.factor=1
26
27# Topic to use for storing statuses. 
28status.storage.topic=connect-status
29status.storage.replication.factor=1
30#status.storage.partitions=5
31
32...
33
34# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
35# (connectors, converters, transformations).
36plugin.path=/opt/connectors

Download Connectors

The connector sources need to be downloaded into the respective host paths (./connectors/confluent-s3 and ./connectors/msk-datagen) so that they are volume-mapped to the container’s plugin path (/opt/connectors). The following script downloads them into the host paths.

 1# /kafka-dev-with-docker/part-03/download.sh
 2#!/usr/bin/env bash
 3SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
 4
 5SRC_PATH=${SCRIPT_DIR}/connectors
 6rm -rf ${SRC_PATH} && mkdir -p ${SRC_PATH}/msk-datagen
 7
 8## Confluent S3 Sink Connector
 9echo "downloading confluent s3 connector..."
10DOWNLOAD_URL=https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.4.3/confluentinc-kafka-connect-s3-10.4.3.zip
11
12curl -o ${SRC_PATH}/confluent.zip ${DOWNLOAD_URL} \
13  && unzip -qq ${SRC_PATH}/confluent.zip -d ${SRC_PATH} \
14  && rm ${SRC_PATH}/confluent.zip \
15  && mv ${SRC_PATH}/$(ls ${SRC_PATH} | grep confluentinc-kafka-connect-s3) ${SRC_PATH}/confluent-s3
16
17## MSK Data Generator Souce Connector
18echo "downloading msk data generator..."
19DOWNLOAD_URL=https://github.com/awslabs/amazon-msk-data-generator/releases/download/v0.4.0/msk-data-generator-0.4-jar-with-dependencies.jar
20
21curl -L -o ${SRC_PATH}/msk-datagen/msk-data-generator.jar ${DOWNLOAD_URL}

Below shows the folder structure after the connectors are downloaded successfully.

 1$ tree connectors/ -d
 2connectors/
 3├── confluent-s3
 4│   ├── assets
 5│   ├── doc
 6│   │   ├── licenses
 7│   │   └── notices
 8│   ├── etc
 9│   └── lib
10└── msk-datagen

Start Docker Compose Services

There are 3 docker compose files for the Kafka cluster, Kafka Connect and management applications. We can run the whole services by starting them in order. The order matters as the Connect server relies on the Kafka cluster and kpow in compose-ui.yml fails if the Connect server is not up and running. Note the Connect server address is added to both the Kafka management apps in compose-ui.yml, and we are able to monitor and manage connectors on them.

 1$ cd kafka-dev-with-docker/part-03
 2# download connectors
 3$ ./download.sh
 4# starts 3 node kafka cluster
 5$ docker-compose -f compose-kafka.yml up -d
 6# starts kafka connect server in distributed mode
 7$ docker-compose -f compose-connect.yml up -d
 8# starts kafka-ui and kpow
 9# connect server address (http://kafka-connect:8083) is added
10# check updated environment variables of each service
11$ docker-compose -f compose-ui.yml up -d

Source Connector Creation

As mentioned earlier, Kafka Connect provides a REST API that manages connectors. We can create a connector programmatically. The REST endpoint requires a JSON payload that includes connector configurations.

1$ cd kafka-dev-with-docker/part-03
2$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
3  http://localhost:8083/connectors/ -d @configs/source.json

The connector class (connector.class) is required for any connector and I set it for the MSK Data Generator. Also, as many as two workers are allocated to the connector (tasks.max). As mentioned earlier, the converter-related properties are overridden. Specifically, the key converter is set to the string converter as the keys of both topics are set to be primitive values (genkp). Also, schemas are not enabled for both the key and value.

The remaining properties are specific to the source connector. Basically it sends messages to two topics (customer and order). They are linked by the customer_id attribute of the order topic where the value is from the key of the customer topic. This is useful for practicing stream processing e.g. for joining two streams.

 1// kafka-dev-with-docker/part-03/configs/source.json
 2{
 3  "name": "order-source",
 4  "config": {
 5    "connector.class": "com.amazonaws.mskdatagen.GeneratorSourceConnector",
 6    "tasks.max": "2",
 7    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 8    "key.converter.schemas.enable": false,
 9    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
10    "value.converter.schemas.enable": false,
11
12    "genkp.customer.with": "#{Code.isbn10}",
13    "genv.customer.name.with": "#{Name.full_name}",
14
15    "genkp.order.with": "#{Internet.uuid}",
16    "genv.order.product_id.with": "#{number.number_between '101','109'}",
17    "genv.order.quantity.with": "#{number.number_between '1','5'}",
18    "genv.order.customer_id.matching": "customer.key",
19
20    "global.throttle.ms": "500",
21    "global.history.records.max": "1000"
22  }
23}

Once created successfully, we can check the connector status as shown below.

1$ curl http://localhost:8083/connectors/order-source/status
 1{
 2	"name": "order-source",
 3	"connector": {
 4		"state": "RUNNING",
 5		"worker_id": "172.19.0.6:8083"
 6	},
 7	"tasks": [
 8		{
 9			"id": 0,
10			"state": "RUNNING",
11			"worker_id": "172.19.0.6:8083"
12		},
13		{
14			"id": 1,
15			"state": "RUNNING",
16			"worker_id": "172.19.0.6:8083"
17		}
18	],
19	"type": "source"
20}

As we’ve added the connector URL, the Kafka Connect menu gets appeared on kafka-ui. We can check the details of the connector on the app as well.

Kafka Topics

As configured, the source connector ingests messages to the customer and order topics.

We can browse individual messages in the Messages tab of a topic.

Sink Connector Creation

Similar to the source connector, we can create the sink connector using the REST API.

1$ cd kafka-dev-with-docker/part-03
2$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
3  http://localhost:8083/connectors/ -d @configs/sink.json

The connector is configured to write messages from both the topics (topics) into a S3 bucket (s3.bucket.name) where files are prefixed by the partition number (DefaultPartitioner). Also, it invokes file commits every 60 seconds (rotate.schedule.interval.ms) or the number of messages reach 100 (flush.size). Like the source connector, it overrides the converter-related properties.

 1// kafka-dev-with-docker/part-03/configs/sink.json
 2{
 3  "name": "order-sink",
 4  "config": {
 5    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
 6    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
 7    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
 8    "tasks.max": "2",
 9    "topics": "order,customer",
10    "s3.bucket.name": "kafka-dev-ap-southeast-2",
11    "s3.region": "ap-southeast-2",
12    "flush.size": "100",
13    "rotate.schedule.interval.ms": "60000",
14    "timezone": "Australia/Sydney",
15    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
16    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
17    "key.converter.schemas.enable": false,
18    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
19    "value.converter.schemas.enable": false,
20    "errors.log.enable": "true"
21  }
22}

Below shows the sink connector details on kafka-ui.

Kafka Consumers

The sink connector creates a Kafka consumer, and it is named as connect-order-sink. We see that it subscribes the two topics and is in the stable state. It has two members because it is configured to have as many as 2 tasks.

S3 Destination

The sink connector writes messages of the two topics (customer and order), and topic names are used as prefixes.

As mentioned, the default partitioner prefixes files further by the partition number, and it can be checked below.

The files are generated by <topic>+<partiton>+<start-offset>.json. The sink connector’s format class is set to io.confluent.connect.s3.format.json.JsonFormat so that it writes to Json files.

Summary

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. In this post, I illustrated how to set up a data ingestion pipeline using Kafka connectors. Fake customer and order data was ingested into the corresponding topics using the MSK Data Generator source connector. Also, the topic messages were saved into a S3 bucket using the Confluent S3 sink connector.