Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. In this lab, we will discuss how to create a data pipeline that ingests data from a Kafka topic into a DynamoDB table using the Camel DynamoDB sink connector.
- Introduction
- Lab 1 Produce data to Kafka using Lambda
- Lab 2 Write data to Kafka from S3 using Flink
- Lab 3 Transform and write data to S3 from Kafka using Flink
- Lab 4 Clean, Aggregate, and Enrich Events with Flink
- Lab 5 Write data to DynamoDB using Kafka Connect (this post)
- Lab 6 Consume data from Kafka using Lambda
Architecture
Fake taxi ride data is sent to a Kafka topic by the Kafka producer application that is discussed in Lab 1. The messages of the topic are written into a DynamoDB table by a Kafka sink connector, which is deployed on Amazon MSK Connect.
Infrastructure
The AWS infrastructure is created using Terraform and the source can be found in the GitHub repository of this post. See this earlier post for details about how to create the resources. The key resources cover a VPC, VPN server, MSK cluster and Python Lambda producer app.
MSK Connect
For this lab, a Kafka sink connector and DynamoDB table are created additionally, and their details are illustrated below.
Download Connector Source
Before we deploy the sink connector, its source should be downloaded into the infra/connectors folder. From there, the source can be saved into a S3 bucket followed by being used to create a custom plugin. The connector source has multiple Jar files, and they should be compressed as the zip format. The archive file can be created by executing the download.sh file.
1# download.sh
2#!/usr/bin/env bash
3SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
4
5SRC_PATH=${SCRIPT_DIR}/infra/connectors
6rm -rf ${SRC_PATH} && mkdir -p ${SRC_PATH}
7
8## Download camel dynamodb sink connector
9echo "download camel dynamodb sink connector..."
10DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-ddb-sink-kafka-connector/3.20.3/camel-aws-ddb-sink-kafka-connector-3.20.3-package.tar.gz
11
12# decompress and zip contents to create custom plugin of msk connect later
13curl -o ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz ${DOWNLOAD_URL} \
14 && tar -xvzf ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz -C ${SRC_PATH} \
15 && cd ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector \
16 && zip -r camel-aws-ddb-sink-kafka-connector.zip . \
17 && mv camel-aws-ddb-sink-kafka-connector.zip ${SRC_PATH} \
18 && rm ${SRC_PATH}/camel-aws-ddb-sink-kafka-connector.tar.gz
Below shows the sink connector source. As mentioned, the zip file will be used to create a custom plugin. Note that the unarchived connect source is kept because we can use it on a local Kafka Connect server deployed on Docker, which can be used for local development.
1$ tree infra/connectors -P 'camel-aws-ddb-sink-kafka-connector*' -I 'docs'
2infra/connectors
3├── camel-aws-ddb-sink-kafka-connector
4│ └── camel-aws-ddb-sink-kafka-connector-3.20.3.jar
5└── camel-aws-ddb-sink-kafka-connector.zip
DynamoDB Sink Connector
The connector is configured to write messages from the taxi-rides topic into a DynamoDB table. It requires to specify the table name, AWS region, operation, write capacity and whether to use the default credential provider - see the documentation for details. Note that, if you don’t use the default credential provider, you have to specify the access key id and secret access key. Note also that the camel.sink.unmarshal option is to convert data from the internal java.util.HashMap into the required java.io.InputStream. Without this configuration, the connector fails with the org.apache.camel.NoTypeConversionAvailableException error.
1# infra/msk-connect.tf
2resource "aws_mskconnect_connector" "taxi_rides_sink" {
3 count = local.connect.to_create ? 1 : 0
4
5 name = "${local.name}-taxi-rides-sink"
6
7 kafkaconnect_version = "2.7.1"
8
9 capacity {
10 provisioned_capacity {
11 mcu_count = 1
12 worker_count = 1
13 }
14 }
15
16 connector_configuration = {
17 # connector configuration
18 "connector.class" = "org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
19 "tasks.max" = "1",
20 "key.converter" = "org.apache.kafka.connect.storage.StringConverter",
21 "key.converter.schemas.enable" = false,
22 "value.converter" = "org.apache.kafka.connect.json.JsonConverter",
23 "value.converter.schemas.enable" = false,
24 # camel ddb sink configuration
25 "topics" = "taxi-rides",
26 "camel.kamelet.aws-ddb-sink.table" = aws_dynamodb_table.taxi_rides.id,
27 "camel.kamelet.aws-ddb-sink.region" = local.region,
28 "camel.kamelet.aws-ddb-sink.operation" = "PutItem",
29 "camel.kamelet.aws-ddb-sink.writeCapacity" = 1,
30 "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider" = true,
31 "camel.sink.unmarshal" = "jackson"
32 }
33
34 kafka_cluster {
35 apache_kafka_cluster {
36 bootstrap_servers = aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
37
38 vpc {
39 security_groups = [aws_security_group.msk.id]
40 subnets = module.vpc.private_subnets
41 }
42 }
43 }
44
45 kafka_cluster_client_authentication {
46 authentication_type = "IAM"
47 }
48
49 kafka_cluster_encryption_in_transit {
50 encryption_type = "TLS"
51 }
52
53 plugin {
54 custom_plugin {
55 arn = aws_mskconnect_custom_plugin.camel_ddb_sink[0].arn
56 revision = aws_mskconnect_custom_plugin.camel_ddb_sink[0].latest_revision
57 }
58 }
59
60 log_delivery {
61 worker_log_delivery {
62 cloudwatch_logs {
63 enabled = true
64 log_group = aws_cloudwatch_log_group.camel_ddb_sink[0].name
65 }
66 s3 {
67 enabled = true
68 bucket = aws_s3_bucket.default_bucket.id
69 prefix = "logs/msk/connect/taxi-rides-sink"
70 }
71 }
72 }
73
74 service_execution_role_arn = aws_iam_role.kafka_connector_role[0].arn
75}
76
77resource "aws_mskconnect_custom_plugin" "camel_ddb_sink" {
78 count = local.connect.to_create ? 1 : 0
79
80 name = "${local.name}-camel-ddb-sink"
81 content_type = "ZIP"
82
83 location {
84 s3 {
85 bucket_arn = aws_s3_bucket.default_bucket.arn
86 file_key = aws_s3_object.camel_ddb_sink[0].key
87 }
88 }
89}
90
91resource "aws_s3_object" "camel_ddb_sink" {
92 count = local.connect.to_create ? 1 : 0
93
94 bucket = aws_s3_bucket.default_bucket.id
95 key = "plugins/camel-aws-ddb-sink-kafka-connector.zip"
96 source = "connectors/camel-aws-ddb-sink-kafka-connector.zip"
97
98 etag = filemd5("connectors/camel-aws-ddb-sink-kafka-connector.zip")
99}
100
101resource "aws_cloudwatch_log_group" "camel_ddb_sink" {
102 count = local.connect.to_create ? 1 : 0
103
104 name = "/msk/connect/camel-ddb-sink"
105
106 retention_in_days = 1
107
108 tags = local.tags
109}
Connector IAM Role
The managed policy of the connector role has permission on MSK cluster resources (cluster, topic and group). It also has permission on S3 bucket and CloudWatch Log for logging. Finally, as the connector should be able to create records in a DynamoDB table, the DynamoDB full access policy (AmazonDynamoDBFullAccess) is attached to the role. Note that the DynamoDB policy is too generous, and it is recommended limiting its scope in production environment.
1# infra/msk-connect.tf
2resource "aws_iam_role" "kafka_connector_role" {
3 count = local.connect.to_create ? 1 : 0
4
5 name = "${local.name}-connector-role"
6
7 assume_role_policy = jsonencode({
8 Version = "2012-10-17"
9 Statement = [
10 {
11 Action = "sts:AssumeRole"
12 Effect = "Allow"
13 Sid = ""
14 Principal = {
15 Service = "kafkaconnect.amazonaws.com"
16 }
17 },
18 ]
19 })
20 managed_policy_arns = [
21 "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess",
22 aws_iam_policy.kafka_connector_policy[0].arn
23 ]
24}
25
26resource "aws_iam_policy" "kafka_connector_policy" {
27 count = local.connect.to_create ? 1 : 0
28
29 name = "${local.name}-connector-policy"
30
31 policy = jsonencode({
32 Version = "2012-10-17"
33 Statement = [
34 {
35 Sid = "PermissionOnCluster"
36 Action = [
37 "kafka-cluster:Connect",
38 "kafka-cluster:AlterCluster",
39 "kafka-cluster:DescribeCluster"
40 ]
41 Effect = "Allow"
42 Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.name}-msk-cluster/*"
43 },
44 {
45 Sid = "PermissionOnTopics"
46 Action = [
47 "kafka-cluster:*Topic*",
48 "kafka-cluster:WriteData",
49 "kafka-cluster:ReadData"
50 ]
51 Effect = "Allow"
52 Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.name}-msk-cluster/*"
53 },
54 {
55 Sid = "PermissionOnGroups"
56 Action = [
57 "kafka-cluster:AlterGroup",
58 "kafka-cluster:DescribeGroup"
59 ]
60 Effect = "Allow"
61 Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.name}-msk-cluster/*"
62 },
63 {
64 Sid = "PermissionOnDataBucket"
65 Action = [
66 "s3:ListBucket",
67 "s3:*Object"
68 ]
69 Effect = "Allow"
70 Resource = [
71 "${aws_s3_bucket.default_bucket.arn}",
72 "${aws_s3_bucket.default_bucket.arn}/*"
73 ]
74 },
75 {
76 Sid = "LoggingPermission"
77 Action = [
78 "logs:CreateLogStream",
79 "logs:CreateLogGroup",
80 "logs:PutLogEvents"
81 ]
82 Effect = "Allow"
83 Resource = "*"
84 },
85 ]
86 })
87}
DynamoDB Table
A DynamoDB table is defined, and it will be used to export the topic messages. The table has the id attribute as the partition key.
1# infra/msk-connect.tf
2resource "aws_dynamodb_table" "taxi_rides" {
3 name = "${local.name}-taxi-rides"
4 billing_mode = "PROVISIONED"
5 read_capacity = 1
6 write_capacity = 1
7 hash_key = "id"
8
9 attribute {
10 name = "id"
11 type = "S"
12 }
13
14 tags = local.tags
15}
The infrastructure can be deployed (as well as destroyed) using Terraform CLI. Note that the sink connector and DynamoDB table are created only when the connect_to_create variable is set to true.
1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true' -var 'connect_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true' -var 'connect_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true' -var 'connect_to_create=true'
Once the resources are deployed, we can check the sink connector on AWS Console.
Local Development (Optional)
Create Kafka Connect on Docker
As discussed further later, we can use a local Kafka cluster deployed on Docker instead of one on Amazon MSK. For this option, we need to deploy a local Kafka Connect server on Docker as well, and it can be created by the following Docker Compose file. See this post for details about how to set up a Kafka Connect server on Docker.
1# compose-extra.yml
2version: "3.5"
3
4services:
5
6 ...
7
8 kafka-connect:
9 image: bitnami/kafka:2.8.1
10 container_name: connect
11 command: >
12 /opt/bitnami/kafka/bin/connect-distributed.sh
13 /opt/bitnami/kafka/config/connect-distributed.properties
14 ports:
15 - "8083:8083"
16 networks:
17 - appnet
18 environment:
19 AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
20 AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
21 volumes:
22 - "./configs/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties"
23 - "./infra/connectors/camel-aws-ddb-sink-kafka-connector:/opt/connectors/camel-aws-ddb-sink-kafka-connector"
24
25networks:
26 appnet:
27 external: true
28 name: app-network
29
30 ...
We can create a local Kafka cluster and Kafka Connect server as following.
1## set aws credentials environment variables
2export AWS_ACCESS_KEY_ID=<aws-access-key-id>
3export AWS_SECRET_ACCESS_KEY=<aws-secret-access-key>
4
5# create kafka cluster
6docker-compose -f compose-local-kafka.yml up -d
7# create kafka connect server
8docker-compose -f compose-extra.yml up -d
Create DynamoDB Table with CLI
We still need to create a DynamoDB table, and it can be created using the AWS CLI as shown below.
1// configs/ddb.json
2{
3 "TableName": "real-time-streaming-taxi-rides",
4 "KeySchema": [{ "AttributeName": "id", "KeyType": "HASH" }],
5 "AttributeDefinitions": [{ "AttributeName": "id", "AttributeType": "S" }],
6 "ProvisionedThroughput": {
7 "ReadCapacityUnits": 1,
8 "WriteCapacityUnits": 1
9 }
10}
1$ aws dynamodb create-table --cli-input-json file://configs/ddb.json
Deploy Sink Connector Locally
As Kafka Connect provides a REST API that manages connectors, we can create a connector programmatically. The REST endpoint requires a JSON payload that includes connector configurations.
1// configs/sink.json
2{
3 "name": "real-time-streaming-taxi-rides-sink",
4 "config": {
5 "connector.class": "org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
6 "tasks.max": "1",
7 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
8 "key.converter.schemas.enable": false,
9 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
10 "value.converter.schemas.enable": false,
11 "topics": "taxi-rides",
12
13 "camel.kamelet.aws-ddb-sink.table": "real-time-streaming-taxi-rides",
14 "camel.kamelet.aws-ddb-sink.region": "ap-southeast-2",
15 "camel.kamelet.aws-ddb-sink.operation": "PutItem",
16 "camel.kamelet.aws-ddb-sink.writeCapacity": 1,
17 "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider": true,
18 "camel.sink.unmarshal": "jackson"
19 }
20}
The connector can be created (as well as deleted) using Curl as shown below.
1# deploy sink connector
2$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
3 http://localhost:8083/connectors/ -d @configs/sink.json
4
5# check status
6$ curl http://localhost:8083/connectors/real-time-streaming-taxi-rides-sink/status
7
8# # delete sink connector
9# $ curl -X DELETE http://localhost:8083/connectors/real-time-streaming-taxi-rides-sink
Application Result
Kafka Topic
We can see the topic (taxi-rides) is created, and the details of the topic can be found on the Topics menu on localhost:3000. Note that, if the Kafka monitoring app (kpow) is not started, we can run it using compose-ui.yml - see this post for details about kpow configuration.
Table Records
We can check the ingested records on the DynamoDB table items view. Below shows a list of scanned records.
Summary
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. In this lab, we discussed how to create a data pipeline that ingests data from a Kafka topic into a DynamoDB table using the Camel DynamoDB sink connector.
Comments