Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. In this lab, we will discuss how to create a data pipeline that ingests data from a Kafka topic into a DynamoDB table using the Camel DynamoDB sink connector.

Architecture

Fake taxi ride data is sent to a Kafka topic by the Kafka producer application that is discussed in Lab 1. The messages of the topic are written into a DynamoDB table by a Kafka sink connector, which is deployed on Amazon MSK Connect.

Infrastructure

The AWS infrastructure is created using Terraform and the source can be found in the GitHub repository of this post. See this earlier post for details about how to create the resources. The key resources cover a VPC, VPN server, MSK cluster and Python Lambda producer app.

MSK Connect

For this lab, a Kafka sink connector and DynamoDB table are created additionally, and their details are illustrated below.

Download Connector Source

Before we deploy the sink connector, its source should be downloaded into the infra/connectors folder. From there, the source can be saved into a S3 bucket followed by being used to create a custom plugin. The connector source has multiple Jar files, and they should be compressed as the zip format. The archive file can be created by executing the download.sh file.

 1# download.sh
 2#!/usr/bin/env bash
 3SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
 4
 5SRC_PATH=${SCRIPT_DIR}/infra/connectors
 6rm -rf ${SRC_PATH} && mkdir -p ${SRC_PATH}
 7
 8## Download camel dynamodb sink connector
 9echo "download camel dynamodb sink connector..."
10DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ddb-sink-kafka-connector/3.20.3/camel-aws-ddb-sink-kafka-connector-3.20.3-package.tar.gz
11
12# decompress and zip contents to create custom plugin of msk connect later
13curl -o ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz ${DOWNLOAD_URL} \
14  && tar -xvzf ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz -C ${SRC_PATH} \
15  && cd ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector \
16  && zip -r camel-aws-ddb-sink-kafka-connector.zip . \
17  && mv camel-aws-ddb-sink-kafka-connector.zip ${SRC_PATH} \
18  && rm ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz

Below shows the sink connector source. As mentioned, the zip file will be used to create a custom plugin. Note that the unarchived connect source is kept because we can use it on a local Kafka Connect server deployed on Docker, which can be used for local development.

1$ tree infra/connectors -P 'camel-aws-ddb-sink-kafka-connector*' -I 'docs'
2infra/connectors
3├── camel-aws-ddb-sink-kafka-connector
4│   └── camel-aws-ddb-sink-kafka-connector-3.20.3.jar
5└── camel-aws-ddb-sink-kafka-connector.zip

DynamoDB Sink Connector

The connector is configured to write messages from the taxi-rides topic into a DynamoDB table. It requires to specify the table name, AWS region, operation, write capacity and whether to use the default credential provider - see the documentation for details. Note that, if you don’t use the default credential provider, you have to specify the access key id and secret access key. Note also that the camel.sink.unmarshal option is to convert data from the internal java.util.HashMap into the required java.io.InputStream. Without this configuration, the connector fails with the org.apache.camel.NoTypeConversionAvailableException error.

  1# infra/msk-connect.tf
  2resource "aws_mskconnect_connector" "taxi_rides_sink" {
  3  count = local.connect.to_create ? 1 : 0
  4
  5  name = "${local.name}-taxi-rides-sink"
  6
  7  kafkaconnect_version = "2.7.1"
  8
  9  capacity {
 10    provisioned_capacity {
 11      mcu_count    = 1
 12      worker_count = 1
 13    }
 14  }
 15
 16  connector_configuration = {
 17    # connector configuration
 18    "connector.class"                = "org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
 19    "tasks.max"                      = "1",
 20    "key.converter"                  = "org.apache.kafka.connect.storage.StringConverter",
 21    "key.converter.schemas.enable"   = false,
 22    "value.converter"                = "org.apache.kafka.connect.json.JsonConverter",
 23    "value.converter.schemas.enable" = false,
 24    # camel ddb sink configuration
 25    "topics"                                                   = "taxi-rides",
 26    "camel.kamelet.aws-ddb-sink.table"                         = aws_dynamodb_table.taxi_rides.id,
 27    "camel.kamelet.aws-ddb-sink.region"                        = local.region,
 28    "camel.kamelet.aws-ddb-sink.operation"                     = "PutItem",
 29    "camel.kamelet.aws-ddb-sink.writeCapacity"                 = 1,
 30    "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider" = true,
 31    "camel.sink.unmarshal"                                     = "jackson"
 32  }
 33
 34  kafka_cluster {
 35    apache_kafka_cluster {
 36      bootstrap_servers = aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
 37
 38      vpc {
 39        security_groups = [aws_security_group.msk.id]
 40        subnets         = module.vpc.private_subnets
 41      }
 42    }
 43  }
 44
 45  kafka_cluster_client_authentication {
 46    authentication_type = "IAM"
 47  }
 48
 49  kafka_cluster_encryption_in_transit {
 50    encryption_type = "TLS"
 51  }
 52
 53  plugin {
 54    custom_plugin {
 55      arn      = aws_mskconnect_custom_plugin.camel_ddb_sink[0].arn
 56      revision = aws_mskconnect_custom_plugin.camel_ddb_sink[0].latest_revision
 57    }
 58  }
 59
 60  log_delivery {
 61    worker_log_delivery {
 62      cloudwatch_logs {
 63        enabled   = true
 64        log_group = aws_cloudwatch_log_group.camel_ddb_sink[0].name
 65      }
 66      s3 {
 67        enabled = true
 68        bucket  = aws_s3_bucket.default_bucket.id
 69        prefix  = "logs/msk/connect/taxi-rides-sink"
 70      }
 71    }
 72  }
 73
 74  service_execution_role_arn = aws_iam_role.kafka_connector_role[0].arn
 75}
 76
 77resource "aws_mskconnect_custom_plugin" "camel_ddb_sink" {
 78  count = local.connect.to_create ? 1 : 0
 79
 80  name         = "${local.name}-camel-ddb-sink"
 81  content_type = "ZIP"
 82
 83  location {
 84    s3 {
 85      bucket_arn = aws_s3_bucket.default_bucket.arn
 86      file_key   = aws_s3_object.camel_ddb_sink[0].key
 87    }
 88  }
 89}
 90
 91resource "aws_s3_object" "camel_ddb_sink" {
 92  count = local.connect.to_create ? 1 : 0
 93
 94  bucket = aws_s3_bucket.default_bucket.id
 95  key    = "plugins/camel-aws-ddb-sink-kafka-connector.zip"
 96  source = "connectors/camel-aws-ddb-sink-kafka-connector.zip"
 97
 98  etag = filemd5("connectors/camel-aws-ddb-sink-kafka-connector.zip")
 99}
100
101resource "aws_cloudwatch_log_group" "camel_ddb_sink" {
102  count = local.connect.to_create ? 1 : 0
103
104  name = "/msk/connect/camel-ddb-sink"
105
106  retention_in_days = 1
107
108  tags = local.tags
109}

Connector IAM Role

The managed policy of the connector role has permission on MSK cluster resources (cluster, topic and group). It also has permission on S3 bucket and CloudWatch Log for logging. Finally, as the connector should be able to create records in a DynamoDB table, the DynamoDB full access policy (AmazonDynamoDBFullAccess) is attached to the role. Note that the DynamoDB policy is too generous, and it is recommended limiting its scope in production environment.

 1# infra/msk-connect.tf
 2resource "aws_iam_role" "kafka_connector_role" {
 3  count = local.connect.to_create ? 1 : 0
 4
 5  name = "${local.name}-connector-role"
 6
 7  assume_role_policy = jsonencode({
 8    Version = "2012-10-17"
 9    Statement = [
10      {
11        Action = "sts:AssumeRole"
12        Effect = "Allow"
13        Sid    = ""
14        Principal = {
15          Service = "kafkaconnect.amazonaws.com"
16        }
17      },
18    ]
19  })
20  managed_policy_arns = [
21    "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
22    aws_iam_policy.kafka_connector_policy[0].arn
23  ]
24}
25
26resource "aws_iam_policy" "kafka_connector_policy" {
27  count = local.connect.to_create ? 1 : 0
28
29  name = "${local.name}-connector-policy"
30
31  policy = jsonencode({
32    Version = "2012-10-17"
33    Statement = [
34      {
35        Sid = "PermissionOnCluster"
36        Action = [
37          "kafka-cluster:Connect",
38          "kafka-cluster:AlterCluster",
39          "kafka-cluster:DescribeCluster"
40        ]
41        Effect   = "Allow"
42        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.name}-msk-cluster/*"
43      },
44      {
45        Sid = "PermissionOnTopics"
46        Action = [
47          "kafka-cluster:*Topic*",
48          "kafka-cluster:WriteData",
49          "kafka-cluster:ReadData"
50        ]
51        Effect   = "Allow"
52        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.name}-msk-cluster/*"
53      },
54      {
55        Sid = "PermissionOnGroups"
56        Action = [
57          "kafka-cluster:AlterGroup",
58          "kafka-cluster:DescribeGroup"
59        ]
60        Effect   = "Allow"
61        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.name}-msk-cluster/*"
62      },
63      {
64        Sid = "PermissionOnDataBucket"
65        Action = [
66          "s3:ListBucket",
67          "s3:*Object"
68        ]
69        Effect = "Allow"
70        Resource = [
71          "${aws_s3_bucket.default_bucket.arn}",
72          "${aws_s3_bucket.default_bucket.arn}/*"
73        ]
74      },
75      {
76        Sid = "LoggingPermission"
77        Action = [
78          "logs:CreateLogStream",
79          "logs:CreateLogGroup",
80          "logs:PutLogEvents"
81        ]
82        Effect   = "Allow"
83        Resource = "*"
84      },
85    ]
86  })
87}

DynamoDB Table

A DynamoDB table is defined, and it will be used to export the topic messages. The table has the id attribute as the partition key.

 1# infra/msk-connect.tf
 2resource "aws_dynamodb_table" "taxi_rides" {
 3  name           = "${local.name}-taxi-rides"
 4  billing_mode   = "PROVISIONED"
 5  read_capacity  = 1
 6  write_capacity = 1
 7  hash_key       = "id"
 8
 9  attribute {
10    name = "id"
11    type = "S"
12  }
13
14  tags = local.tags
15}

The infrastructure can be deployed (as well as destroyed) using Terraform CLI. Note that the sink connector and DynamoDB table are created only when the connect_to_create variable is set to true.

1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true' -var 'connect_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true' -var 'connect_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true' -var 'connect_to_create=true'

Once the resources are deployed, we can check the sink connector on AWS Console.

Local Development (Optional)

Create Kafka Connect on Docker

As discussed further later, we can use a local Kafka cluster deployed on Docker instead of one on Amazon MSK. For this option, we need to deploy a local Kafka Connect server on Docker as well, and it can be created by the following Docker Compose file. See this post for details about how to set up a Kafka Connect server on Docker.

 1# compose-extra.yml
 2version: "3.5"
 3
 4services:
 5
 6  ...
 7
 8  kafka-connect:
 9    image: bitnami/kafka:2.8.1
10    container_name: connect
11    command: >
12      /opt/bitnami/kafka/bin/connect-distributed.sh
13      /opt/bitnami/kafka/config/connect-distributed.properties      
14    ports:
15      - "8083:8083"
16    networks:
17      - appnet
18    environment:
19      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
20      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
21    volumes:
22      - "./configs/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties"
23      - "./infra/connectors/camel-aws-ddb-sink-kafka-connector:/opt/connectors/camel-aws-ddb-sink-kafka-connector"
24
25networks:
26  appnet:
27    external: true
28    name: app-network
29
30  ...

We can create a local Kafka cluster and Kafka Connect server as following.

1## set aws credentials environment variables
2export AWS_ACCESS_KEY_ID=<aws-access-key-id>
3export AWS_SECRET_ACCESS_KEY=<aws-secret-access-key>
4
5# create kafka cluster
6docker-compose -f compose-local-kafka.yml up -d
7# create kafka connect server
8docker-compose -f compose-extra.yml up -d

Create DynamoDB Table with CLI

We still need to create a DynamoDB table, and it can be created using the AWS CLI as shown below.

 1// configs/ddb.json
 2{
 3  "TableName": "real-time-streaming-taxi-rides",
 4  "KeySchema": [{ "AttributeName": "id", "KeyType": "HASH" }],
 5  "AttributeDefinitions": [{ "AttributeName": "id", "AttributeType": "S" }],
 6  "ProvisionedThroughput": {
 7    "ReadCapacityUnits": 1,
 8    "WriteCapacityUnits": 1
 9  }
10}
1$ aws dynamodb create-table --cli-input-json file://configs/ddb.json

Deploy Sink Connector Locally

As Kafka Connect provides a REST API that manages connectors, we can create a connector programmatically. The REST endpoint requires a JSON payload that includes connector configurations.

 1// configs/sink.json
 2{
 3  "name": "real-time-streaming-taxi-rides-sink",
 4  "config": {
 5    "connector.class": "org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
 6    "tasks.max": "1",
 7    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 8    "key.converter.schemas.enable": false,
 9    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
10    "value.converter.schemas.enable": false,
11    "topics": "taxi-rides",
12
13    "camel.kamelet.aws-ddb-sink.table": "real-time-streaming-taxi-rides",
14    "camel.kamelet.aws-ddb-sink.region": "ap-southeast-2",
15    "camel.kamelet.aws-ddb-sink.operation": "PutItem",
16    "camel.kamelet.aws-ddb-sink.writeCapacity": 1,
17    "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider": true,
18    "camel.sink.unmarshal": "jackson"
19  }
20}

The connector can be created (as well as deleted) using Curl as shown below.

1# deploy sink connector
2$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" \
3  http://localhost:8083/connectors/ -d @configs/sink.json
4
5# check status
6$ curl http://localhost:8083/connectors/real-time-streaming-taxi-rides-sink/status
7
8# # delete sink connector
9# $ curl -X DELETE http://localhost:8083/connectors/real-time-streaming-taxi-rides-sink

Application Result

Kafka Topic

We can see the topic (taxi-rides) is created, and the details of the topic can be found on the Topics menu on localhost:3000. Note that, if the Kafka monitoring app (kpow) is not started, we can run it using compose-ui.yml - see this post for details about kpow configuration.

Table Records

We can check the ingested records on the DynamoDB table items view. Below shows a list of scanned records.

Summary

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. In this lab, we discussed how to create a data pipeline that ingests data from a Kafka topic into a DynamoDB table using the Camel DynamoDB sink connector.