Amazon MSK can be configured as an event source of a Lambda function. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. With this feature, we can develop a Kafka consumer application in serverless environment where developers can focus on application logic. In this lab, we will discuss how to create a Kafka consumer using a Lambda function.

Architecture

Fake taxi ride data is sent to a Kafka topic by the Kafka producer application that is discussed in Lab 1. The messages of the taxi-rides topic are consumed by a Lambda function where the MSK cluster is configured as an event source of the function.

Infrastructure

The AWS infrastructure is created using Terraform and the source can be found in the GitHub repository of this post. See this earlier post for details about how to create the resources. The key resources cover a VPC, VPN server, MSK cluster and Python Lambda producer app.

Lambda Kafka Consumer

The Kafka consumer Lambda function is created additionally for this lab, and it is deployed conditionally by a flag variable called consumer_to_create. Once it is set to true, the function is created by the AWS Lambda Terraform module while referring to the associating configuration variables (local.consumer.*). Note that an event source mapping is created on the Lambda function where the event source is set to the Kafka cluster on Amazon MSK. We can also control which topics to poll by adding one or more topic names in the topics attribute - only a single topic named taxi-rides is specified for this lab.

 1# infra/variables.tf
 2variable "consumer_to_create" {
 3  description = "Flag to indicate whether to create Kafka consumer"
 4  type        = bool
 5  default     = false
 6}
 7
 8...
 9
10locals {
11  ...
12  consumer = {
13    to_create         = var.consumer_to_create
14    src_path          = "../consumer"
15    function_name     = "kafka_consumer"
16    handler           = "app.lambda_function"
17    timeout           = 600
18    memory_size       = 128
19    runtime           = "python3.8"
20    topic_name        = "taxi-rides"
21    starting_position = "TRIM_HORIZON"
22  }
23  ...
24}
25
26# infra/consumer.tf
27module "kafka_consumer" {
28  source  = "terraform-aws-modules/lambda/aws"
29  version = ">=5.1.0, <6.0.0"
30
31  create = local.consumer.to_create
32
33  function_name          = local.consumer.function_name
34  handler                = local.consumer.handler
35  runtime                = local.consumer.runtime
36  timeout                = local.consumer.timeout
37  memory_size            = local.consumer.memory_size
38  source_path            = local.consumer.src_path
39  vpc_subnet_ids         = module.vpc.private_subnets
40  vpc_security_group_ids = local.consumer.to_create ? [aws_security_group.kafka_consumer[0].id] : null
41  attach_network_policy  = true
42  attach_policies        = true
43  policies               = local.consumer.to_create ? [aws_iam_policy.kafka_consumer[0].arn] : null
44  number_of_policies     = 1
45
46  depends_on = [
47    aws_msk_cluster.msk_data_cluster
48  ]
49
50  tags = local.tags
51}
52
53resource "aws_lambda_event_source_mapping" "kafka_consumer" {
54  count             = local.consumer.to_create ? 1 : 0
55  event_source_arn  = aws_msk_cluster.msk_data_cluster.arn
56  function_name     = module.kafka_consumer.lambda_function_name
57  topics            = [local.consumer.topic_name]
58  starting_position = local.consumer.starting_position
59  amazon_managed_kafka_event_source_config {
60    consumer_group_id = "${local.consumer.topic_name}-group-01"
61  }
62}
63
64resource "aws_lambda_permission" "kafka_consumer" {
65  count         = local.consumer.to_create ? 1 : 0
66  statement_id  = "InvokeLambdaFunction"
67  action        = "lambda:InvokeFunction"
68  function_name = local.consumer.function_name
69  principal     = "kafka.amazonaws.com"
70  source_arn    = aws_msk_cluster.msk_data_cluster.arn
71}

IAM Permission

The consumer Lambda function needs permission to read messages to the Kafka topic. The following IAM policy is added to the Lambda function as illustrated in this AWS documentation.

 1# infra/consumer.tf
 2resource "aws_iam_policy" "kafka_consumer" {
 3  count = local.consumer.to_create ? 1 : 0
 4  name  = "${local.consumer.function_name}-msk-lambda-permission"
 5
 6  policy = jsonencode({
 7    Version = "2012-10-17"
 8    Statement = [
 9      {
10        Sid = "PermissionOnKafkaCluster"
11        Action = [
12          "kafka-cluster:Connect",
13          "kafka-cluster:DescribeGroup",
14          "kafka-cluster:AlterGroup",
15          "kafka-cluster:DescribeTopic",
16          "kafka-cluster:ReadData",
17          "kafka-cluster:DescribeClusterDynamicConfiguration"
18        ]
19        Effect = "Allow"
20        Resource = [
21          "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.name}-msk-cluster/*",
22          "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.name}-msk-cluster/*",
23          "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.name}-msk-cluster/*"
24        ]
25      },
26      {
27        Sid = "PermissionOnKafka"
28        Action = [
29          "kafka:DescribeCluster",
30          "kafka:GetBootstrapBrokers"
31        ]
32        Effect   = "Allow"
33        Resource = "*"
34      },
35      {
36        Sid = "PermissionOnNetwork"
37        Action = [
38          # The first three actions also exist in netwrok policy attachment in lambda module
39          # "ec2:CreateNetworkInterface",
40          # "ec2:DescribeNetworkInterfaces",
41          # "ec2:DeleteNetworkInterface",
42          "ec2:DescribeVpcs",
43          "ec2:DescribeSubnets",
44          "ec2:DescribeSecurityGroups"
45        ]
46        Effect   = "Allow"
47        Resource = "*"
48      }
49    ]
50  })
51}

Function Source

The records attribute of the Lambda event payload includes Kafka consumer records. Each of the records contains details of the Amazon MSK topic and partition identifier, together with a timestamp and a base64-encoded message (key and value). The Lambda function simply prints those records after decoding the message key and value as well as formatting the timestamp into the ISO format.

 1# consumer/app.py
 2import json
 3import base64
 4import datetime
 5
 6
 7class ConsumerRecord:
 8    def __init__(self, record: dict):
 9        self.topic = record["topic"]
10        self.partition = record["partition"]
11        self.offset = record["offset"]
12        self.timestamp = record["timestamp"]
13        self.timestamp_type = record["timestampType"]
14        self.key = record["key"]
15        self.value = record["value"]
16        self.headers = record["headers"]
17
18    def parse_record(
19        self,
20        to_str: bool = True,
21        to_json: bool = True,
22    ):
23        rec = {
24            **self.__dict__,
25            **{
26                "key": json.loads(base64.b64decode(self.key).decode()),
27                "value": json.loads(base64.b64decode(self.value).decode()),
28                "timestamp": ConsumerRecord.format_timestamp(self.timestamp, to_str),
29            },
30        }
31        return json.dumps(rec, default=ConsumerRecord.serialize) if to_json else rec
32
33    @staticmethod
34    def format_timestamp(value, to_str: bool = True):
35        ts = datetime.datetime.fromtimestamp(value / 1000)
36        return ts.isoformat(timespec="milliseconds") if to_str else ts
37
38    @staticmethod
39    def serialize(obj):
40        if isinstance(obj, datetime.datetime):
41            return obj.isoformat()
42        if isinstance(obj, datetime.date):
43            return str(obj)
44        return obj
45
46
47def lambda_function(event, context):
48    for _, records in event["records"].items():
49        for record in records:
50            cr = ConsumerRecord(record)
51            print(cr.parse_record())

Deployment

The infrastructure can be deployed (as well as destroyed) using Terraform CLI. Note that the Lambda consumer function is created only when the consumer_to_create variable is set to true.

1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true' -var 'consumer_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true' -var 'consumer_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true' -var 'consumer_to_create=true'

Once the resources are deployed, we can check the Lambda function on AWS Console. Note that the MSK cluster is configured as the Lambda trigger as expected.

Application Result

Kafka Topic

We can see the topic (taxi-rides) is created, and the details of the topic can be found on the Topics menu on localhost:3000. Note that, if the Kafka monitoring app (kpow) is not started, we can run it using compose-ui.yml - see this post for details about kpow configuration.

Consumer Output

We can check the outputs of the Lambda function on CloudWatch Logs. As expected, the message key and value are decoded properly.

Summary

Amazon MSK can be configured as an event source of a Lambda function. Lambda internally polls for new messages from the event source and then synchronously invokes the target Lambda function. With this feature, we can develop a Kafka consumer application in serverless environment where developers can focus on application logic. In this lab, we discussed how to create a Kafka consumer using a Lambda function.