Amazon MSK can be configured as an event source of a Lambda function. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. With this feature, we can develop a Kafka consumer application in serverless environment where developers can focus on application logic. In this lab, we will discuss how to create a Kafka consumer using a Lambda function.
- Introduction
- Lab 1 Produce data to Kafka using Lambda
- Lab 2 Write data to Kafka from S3 using Flink
- Lab 3 Transform and write data to S3 from Kafka using Flink
- Lab 4 Clean, Aggregate, and Enrich Events with Flink
- Lab 5 Write data to DynamoDB using Kafka Connect
- Lab 6 Consume data from Kafka using Lambda (this post)
Architecture
Fake taxi ride data is sent to a Kafka topic by the Kafka producer application that is discussed in Lab 1. The messages of the taxi-rides topic are consumed by a Lambda function where the MSK cluster is configured as an event source of the function.
Infrastructure
The AWS infrastructure is created using Terraform and the source can be found in the GitHub repository of this post. See this earlier post for details about how to create the resources. The key resources cover a VPC, VPN server, MSK cluster and Python Lambda producer app.
Lambda Kafka Consumer
The Kafka consumer Lambda function is created additionally for this lab, and it is deployed conditionally by a flag variable called consumer_to_create. Once it is set to true, the function is created by the AWS Lambda Terraform module while referring to the associating configuration variables (local.consumer.*). Note that an event source mapping is created on the Lambda function where the event source is set to the Kafka cluster on Amazon MSK. We can also control which topics to poll by adding one or more topic names in the topics attribute - only a single topic named taxi-rides is specified for this lab.
1# infra/variables.tf
2variable "consumer_to_create" {
3 description = "Flag to indicate whether to create Kafka consumer"
4 type = bool
5 default = false
6}
7
8...
9
10locals {
11 ...
12 consumer = {
13 to_create = var.consumer_to_create
14 src_path = "../consumer"
15 function_name = "kafka_consumer"
16 handler = "app.lambda_function"
17 timeout = 600
18 memory_size = 128
19 runtime = "python3.8"
20 topic_name = "taxi-rides"
21 starting_position = "TRIM_HORIZON"
22 }
23 ...
24}
25
26# infra/consumer.tf
27module "kafka_consumer" {
28 source = "terraform-aws-modules/lambda/aws"
29 version = ">=5.1.0, <6.0.0"
30
31 create = local.consumer.to_create
32
33 function_name = local.consumer.function_name
34 handler = local.consumer.handler
35 runtime = local.consumer.runtime
36 timeout = local.consumer.timeout
37 memory_size = local.consumer.memory_size
38 source_path = local.consumer.src_path
39 vpc_subnet_ids = module.vpc.private_subnets
40 vpc_security_group_ids = local.consumer.to_create ? [aws_security_group.kafka_consumer[0].id] : null
41 attach_network_policy = true
42 attach_policies = true
43 policies = local.consumer.to_create ? [aws_iam_policy.kafka_consumer[0].arn] : null
44 number_of_policies = 1
45
46 depends_on = [
47 aws_msk_cluster.msk_data_cluster
48 ]
49
50 tags = local.tags
51}
52
53resource "aws_lambda_event_source_mapping" "kafka_consumer" {
54 count = local.consumer.to_create ? 1 : 0
55 event_source_arn = aws_msk_cluster.msk_data_cluster.arn
56 function_name = module.kafka_consumer.lambda_function_name
57 topics = [local.consumer.topic_name]
58 starting_position = local.consumer.starting_position
59 amazon_managed_kafka_event_source_config {
60 consumer_group_id = "${local.consumer.topic_name}-group-01"
61 }
62}
63
64resource "aws_lambda_permission" "kafka_consumer" {
65 count = local.consumer.to_create ? 1 : 0
66 statement_id = "InvokeLambdaFunction"
67 action = "lambda:InvokeFunction"
68 function_name = local.consumer.function_name
69 principal = "kafka.amazonaws.com"
70 source_arn = aws_msk_cluster.msk_data_cluster.arn
71}
IAM Permission
The consumer Lambda function needs permission to read messages to the Kafka topic. The following IAM policy is added to the Lambda function as illustrated in this AWS documentation.
1# infra/consumer.tf
2resource "aws_iam_policy" "kafka_consumer" {
3 count = local.consumer.to_create ? 1 : 0
4 name = "${local.consumer.function_name}-msk-lambda-permission"
5
6 policy = jsonencode({
7 Version = "2012-10-17"
8 Statement = [
9 {
10 Sid = "PermissionOnKafkaCluster"
11 Action = [
12 "kafka-cluster:Connect",
13 "kafka-cluster:DescribeGroup",
14 "kafka-cluster:AlterGroup",
15 "kafka-cluster:DescribeTopic",
16 "kafka-cluster:ReadData",
17 "kafka-cluster:DescribeClusterDynamicConfiguration"
18 ]
19 Effect = "Allow"
20 Resource = [
21 "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.name}-msk-cluster/*",
22 "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.name}-msk-cluster/*",
23 "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.name}-msk-cluster/*"
24 ]
25 },
26 {
27 Sid = "PermissionOnKafka"
28 Action = [
29 "kafka:DescribeCluster",
30 "kafka:GetBootstrapBrokers"
31 ]
32 Effect = "Allow"
33 Resource = "*"
34 },
35 {
36 Sid = "PermissionOnNetwork"
37 Action = [
38 # The first three actions also exist in netwrok policy attachment in lambda module
39 # "ec2:CreateNetworkInterface",
40 # "ec2:DescribeNetworkInterfaces",
41 # "ec2:DeleteNetworkInterface",
42 "ec2:DescribeVpcs",
43 "ec2:DescribeSubnets",
44 "ec2:DescribeSecurityGroups"
45 ]
46 Effect = "Allow"
47 Resource = "*"
48 }
49 ]
50 })
51}
Function Source
The records attribute of the Lambda event payload includes Kafka consumer records. Each of the records contains details of the Amazon MSK topic and partition identifier, together with a timestamp and a base64-encoded message (key and value). The Lambda function simply prints those records after decoding the message key and value as well as formatting the timestamp into the ISO format.
1# consumer/app.py
2import json
3import base64
4import datetime
5
6
7class ConsumerRecord:
8 def __init__(self, record: dict):
9 self.topic = record["topic"]
10 self.partition = record["partition"]
11 self.offset = record["offset"]
12 self.timestamp = record["timestamp"]
13 self.timestamp_type = record["timestampType"]
14 self.key = record["key"]
15 self.value = record["value"]
16 self.headers = record["headers"]
17
18 def parse_record(
19 self,
20 to_str: bool = True,
21 to_json: bool = True,
22 ):
23 rec = {
24 **self.__dict__,
25 **{
26 "key": json.loads(base64.b64decode(self.key).decode()),
27 "value": json.loads(base64.b64decode(self.value).decode()),
28 "timestamp": ConsumerRecord.format_timestamp(self.timestamp, to_str),
29 },
30 }
31 return json.dumps(rec, default=ConsumerRecord.serialize) if to_json else rec
32
33 @staticmethod
34 def format_timestamp(value, to_str: bool = True):
35 ts = datetime.datetime.fromtimestamp(value / 1000)
36 return ts.isoformat(timespec="milliseconds") if to_str else ts
37
38 @staticmethod
39 def serialize(obj):
40 if isinstance(obj, datetime.datetime):
41 return obj.isoformat()
42 if isinstance(obj, datetime.date):
43 return str(obj)
44 return obj
45
46
47def lambda_function(event, context):
48 for _, records in event["records"].items():
49 for record in records:
50 cr = ConsumerRecord(record)
51 print(cr.parse_record())
Deployment
The infrastructure can be deployed (as well as destroyed) using Terraform CLI. Note that the Lambda consumer function is created only when the consumer_to_create variable is set to true.
1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true' -var 'consumer_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true' -var 'consumer_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true' -var 'consumer_to_create=true'
Once the resources are deployed, we can check the Lambda function on AWS Console. Note that the MSK cluster is configured as the Lambda trigger as expected.
Application Result
Kafka Topic
We can see the topic (taxi-rides) is created, and the details of the topic can be found on the Topics menu on localhost:3000. Note that, if the Kafka monitoring app (kpow) is not started, we can run it using compose-ui.yml - see this post for details about kpow configuration.
Consumer Output
We can check the outputs of the Lambda function on CloudWatch Logs. As expected, the message key and value are decoded properly.
Summary
Amazon MSK can be configured as an event source of a Lambda function. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. With this feature, we can develop a Kafka consumer application in serverless environment where developers can focus on application logic. In this lab, we discussed how to create a Kafka consumer using a Lambda function.
Comments