Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. In this post, we discuss how to set up a data ingestion pipeline using Kafka connectors. Fake customer and order data is ingested into Kafka topics using the MSK Data Generator. Also, we use the Confluent S3 sink connector to save the messages of the topics into a S3 bucket. The Kafka Connect servers and individual connectors are deployed using the custom resources of Strimzi on Kubernetes.

Kafka Connect

We create a Kafka Connect server using the Strimzi custom resource named KafkaConnect. The source can be found in the GitHub repository of this post.

Create Secrets

As discussed further below, a custom Docker image is built and pushed into an external Docker registry when we create a Kafka Connect server using Strimzi. Therefore, we need the registry secret to push the image. Also, as the sink connector needs permission to write files into a S3 bucket, AWS credentials should be added to the Connect server. Both the secret and credentials will be made available via Kubernetes Secrets and those are created as shown below.

1# Log in to DockerHub if not done - docker login
2kubectl create secret generic regcred \
3  --from-file=.dockerconfigjson=$HOME/.docker/config.json \
4  --type=kubernetes.io/dockerconfigjson
1kubectl create -f - <<EOF
2apiVersion: v1
3kind: Secret
5  name: awscred

Deploy Connect Server

The Kafka Connect server is created using the Strimzi custom resource named KafkaConnect. When we create the custom resource, a Docker image is built, the image is pushed into an external Docker registry (output.type: docker), and it is pulled to deploy the Kafka Connect server instances. Therefore, in the build configuration, we need to indicate the location of an external Docker registry and registry secret (pushSecret). Note that the registry secret is referred from the Kubernetes Secret that is created earlier. Also, we can build connector sources together by specifying their types and URLs in build.plugins.

When it comes to the Connect server configuration, two server instances are configured to run (replicas: 2) and the same Kafka version (2.8.1) to the associating Kafka cluster is used. Also, the name and port of the service that exposes the Kafka internal listeners are used to specify the Kafka bootstrap server address. Moreover, AWS credentials are added to environment variables because the sink connector needs permission to write files to a S3 bucket.

 1# manifests/kafka-connect.yaml
 2apiVersion: kafka.strimzi.io/v1beta2
 3kind: KafkaConnect
 5  name: demo-connect
 6  annotations:
 7    strimzi.io/use-connector-resources: "true"
 9  version: 2.8.1
10  replicas: 2
11  bootstrapServers: demo-cluster-kafka-bootstrap:9092
12  config:
13    group.id: demo-connect
14    offset.storage.topic: demo-connect-offsets
15    config.storage.topic: demo-connect-configs
16    status.storage.topic: demo-connect-status
17    # -1 means it will use the default replication factor configured in the broker
18    config.storage.replication.factor: -1
19    offset.storage.replication.factor: -1
20    status.storage.replication.factor: -1
21    key.converter: org.apache.kafka.connect.json.JsonConverter
22    value.converter: org.apache.kafka.connect.json.JsonConverter
23    key.converter.schemas.enable: false
24    value.converter.schemas.enable: false
25  externalConfiguration:
26    env:
27      - name: AWS_ACCESS_KEY_ID
28        valueFrom:
29          secretKeyRef:
30            name: awscred
31            key: AWS_ACCESS_KEY_ID
32      - name: AWS_SECRET_ACCESS_KEY
33        valueFrom:
34          secretKeyRef:
35            name: awscred
36            key: AWS_SECRET_ACCESS_KEY
37  build:
38    output:
39      type: docker
40      image: jaehyeon/demo-connect:latest
41      pushSecret: regcred
42    # https://strimzi.io/docs/operators/0.27.1/using#plugins
43    plugins:
44      - name: confluentinc-kafka-connect-s3
45        artifacts:
46          - type: zip
47            url: https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-s3/versions/10.4.3/confluentinc-kafka-connect-s3-10.4.3.zip
48      - name: msk-data-generator
49        artifacts:
50          - type: jar
51            url: https://github.com/awslabs/amazon-msk-data-generator/releases/download/v0.4.0/msk-data-generator-0.4-jar-with-dependencies.jar

We assume that a Kafka cluster and management app are deployed on Minikube as discussed in Part 1. The Kafka Connect server can be created using the kubernetes create command as shown below.

1kubectl create -f manifests/kafka-connect.yaml

Once the Connect image is built successfully, we can see that the image is pushed into the external Docker registry.

Then the Connect server instances run by two Pods, and they are exposed by a service named demo-connect-connect-api on port 8083.

 1kubectl get all -l strimzi.io/cluster=demo-connect
 2# NAME                                        READY   STATUS    RESTARTS   AGE
 3# pod/demo-connect-connect-559cd588b4-48lhd   1/1     Running   0          2m4s
 4# pod/demo-connect-connect-559cd588b4-wzqgs   1/1     Running   0          2m4s
 6# NAME                               TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)    AGE
 7# service/demo-connect-connect-api   ClusterIP   <none>        8083/TCP   2m4s
 9# NAME                                   READY   UP-TO-DATE   AVAILABLE   AGE
10# deployment.apps/demo-connect-connect   2/2     2            2           2m4s
12# NAME                                              DESIRED   CURRENT   READY   AGE
13# replicaset.apps/demo-connect-connect-559cd588b4   2         2         2       2m4s

Kafka Connectors

Both the source and sink connectors are created using the Strimzi custom resource named KafkaConnector.

Source Connector

The connector class (spec.class) is set for the MSK Data Generator, and a single worker is allocated to it (tasks.max). Also, the key converter is set to the string converter as the keys of both topics are set to be primitive values (genkp) while the value converter is configured as the json converter. Finally, schemas are not enabled for both the key and value.

The remaining properties are specific to the source connector. Basically it sends messages to two topics (customer and order). They are linked by the customer_id attribute of the order topic where the value is from the key of the customer topic. This is useful for practicing stream processing e.g. for joining two streams.

 1# manifests/kafka-connectors.yaml
 2apiVersion: kafka.strimzi.io/v1beta2
 3kind: KafkaConnector
 5  name: order-source
 6  labels:
 7    strimzi.io/cluster: demo-connect
 9  class: com.amazonaws.mskdatagen.GeneratorSourceConnector
10  tasksMax: 1
11  config:
12    ##
13    key.converter: org.apache.kafka.connect.storage.StringConverter
14    key.converter.schemas.enable: false
15    value.converter: org.apache.kafka.connect.json.JsonConverter
16    value.converter.schemas.enable: false
17    ##
18    genkp.customer.with: "#{Code.isbn10}"
19    genv.customer.name.with: "#{Name.full_name}"
20    genkp.order.with: "#{Internet.uuid}"
21    genv.order.product_id.with: "#{number.number_between '101''109'}"
22    genv.order.quantity.with: "#{number.number_between '1''5'}"
23    genv.order.customer_id.matching: customer.key
24    global.throttle.ms: 500
25    global.history.records.max: 1000

Sink Connector

The connector is configured to write files from both the topics (order and customer) into a S3 bucket (s3.bucket.name) where the file names are prefixed by the partition number (DefaultPartitioner). Also, it invokes file commits every 60 seconds (rotate.schedule.interval.ms) or the number of messages reach 100 (flush.size). Like the source connector, it overrides the converter-related properties.

 1# manifests/kafka-connectors.yaml
 5apiVersion: kafka.strimzi.io/v1beta2
 6kind: KafkaConnector
 8  name: order-sink
 9  labels:
10    strimzi.io/cluster: demo-connect
12  class: io.confluent.connect.s3.S3SinkConnector
13  tasksMax: 1
14  config:
15    ##
16    key.converter: org.apache.kafka.connect.storage.StringConverter
17    key.converter.schemas.enable: false
18    value.converter: org.apache.kafka.connect.json.JsonConverter
19    value.converter.schemas.enable: false
20    ##
21    storage.class: io.confluent.connect.s3.storage.S3Storage
22    format.class: io.confluent.connect.s3.format.json.JsonFormat
23    topics: order,customer
24    s3.bucket.name: kafka-dev-on-k8s-ap-southeast-2
25    s3.region: ap-southeast-2
26    flush.size: 100
27    rotate.schedule.interval.ms: 60000
28    timezone: Australia/Sydney
29    partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
30    errors.log.enable: true

Deploy Connectors

The Kafka source and sink connectors can be created using the kubernetes create command as shown below. Once created, we can check their details by listing (or describing) the Strimzi custom resource (KafkaConnector). Below shows both the source and sink connectors are in ready status, which indicates they are running.

1kubectl create -f manifests/kafka-connectors.yaml
3kubectl get kafkaconnectors
4# NAME           CLUSTER        CONNECTOR CLASS                                     MAX TASKS   READY
5# order-sink     demo-connect   io.confluent.connect.s3.S3SinkConnector             1           True
6# order-source   demo-connect   com.amazonaws.mskdatagen.GeneratorSourceConnector   1           True

We can also see the connector status on the Kafka management app (kafka-ui) as shown below.

The sink connector writes messages of the two topics (customer and order), and topic names are used as S3 prefixes.

The files are generated by <topic>+<partiton>+<start-offset>.json. The sink connector’s format class is set to io.confluent.connect.s3.format.json.JsonFormat so that it writes to Json files.

Delete Resources

The Kubernetes resources and Minikube cluster can be removed by the kubectl delete and minikube delete commands respectively.

 1## delete resources
 2kubectl delete -f manifests/kafka-connectors.yaml
 3kubectl delete -f manifests/kafka-connect.yaml
 4kubectl delete secret awscred
 5kubectl delete secret regcred
 6kubectl delete -f manifests/kafka-cluster.yaml
 7kubectl delete -f manifests/kafka-ui.yaml
 8kubectl delete -f manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
10## delete minikube
11minikube delete


