In this lab, we will create a Kafka producer application using AWS Lambda, which sends fake taxi ride data into a Kafka topic on Amazon MSK. A configurable number of the producer Lambda function will be invoked by an Amazon EventBridge schedule rule. In this way we are able to generate test data concurrently based on the desired volume of messages.
- Introduction
- Lab 1 Produce data to Kafka using Lambda (this post)
- Lab 2 Write data to Kafka from S3 using Flink
- Lab 3 Transform and write data to S3 from Kafka using Flink
- Lab 4 Clean, Aggregate, and Enrich Events with Flink
- Lab 5 Write data to DynamoDB using Kafka Connect
- Lab 6 Consume data from Kafka using Lambda
[Update 2023-11-06] Initially I planned to deploy Pyflink applications on Amazon Managed Service for Apache Flink, but I changed the plan to use a local Flink cluster deployed on Docker. The main reasons are
- It is not clear how to configure a Pyflink application for the managed service. For example, Apache Flink supports pluggable file systems and the required dependency (eg flink-s3-fs-hadoop-1.15.2.jar) should be placed under the plugins folder. However, the sample Pyflink applications from pyflink-getting-started and amazon-kinesis-data-analytics-blueprints either ignore the S3 jar file for deployment or package it together with other dependencies - none of them uses the S3 jar file as a plugin. I tried multiple different configurations, but all ended up with having an error whose code is CodeError.InvalidApplicationCode. I don’t have such an issue when I deploy the app on a local Flink cluster and I haven’t found a way to configure the app for the managed service as yet.
- The Pyflink app for Lab 4 requires the OpenSearch sink connector and the connector is available on 1.16.0+. However, the latest Flink version of the managed service is still 1.15.2 and the sink connector is not available on it. Normally the latest version of the managed service is behind two minor versions of the official release, but it seems to take a little longer to catch up at the moment - the version 1.18.0 was released a while ago.
Architecture
Fake taxi ride data is generated by multiple Kafka Lambda producer functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Lambda function. In this way we are able to generate test data using multiple Lambda functions based on the desired volume of messages.
Infrastructure
The infrastructure is created using Terraform and the source can be found in the GitHub repository of this post.
VPC and VPN
A VPC with 3 public and private subnets is created using the AWS VPC Terraform module (infra/vpc.tf). Also, a SoftEther VPN server is deployed in order to access the resources in the private subnets from the developer machine (infra/vpn.tf). The details about how to configure the VPN server can be found in this post.
MSK Cluster
An MSK cluster with 2 brokers is created. The broker nodes are deployed with the kafka.m5.large instance type in private subnets and IAM authentication is used for the client authentication method. Finally, additional server configurations are added such as enabling topic auto-creation/deletion, (default) number of partitions and default replication factor.
1# infra/variables.tf
2locals {
3 ...
4 msk = {
5 version = "2.8.1"
6 instance_size = "kafka.m5.large"
7 ebs_volume_size = 20
8 log_retention_ms = 604800000 # 7 days
9 number_of_broker_nodes = 2
10 num_partitions = 5
11 default_replication_factor = 2
12 }
13 ...
14}
15
16# infra/msk.tf
17resource "aws_msk_cluster" "msk_data_cluster" {
18 cluster_name = "${local.name}-msk-cluster"
19 kafka_version = local.msk.version
20 number_of_broker_nodes = local.msk.number_of_broker_nodes
21 configuration_info {
22 arn = aws_msk_configuration.msk_config.arn
23 revision = aws_msk_configuration.msk_config.latest_revision
24 }
25
26 broker_node_group_info {
27 instance_type = local.msk.instance_size
28 client_subnets = slice(module.vpc.private_subnets, 0, local.msk.number_of_broker_nodes)
29 security_groups = [aws_security_group.msk.id]
30 storage_info {
31 ebs_storage_info {
32 volume_size = local.msk.ebs_volume_size
33 }
34 }
35 }
36
37 client_authentication {
38 sasl {
39 iam = true
40 }
41 }
42
43 logging_info {
44 broker_logs {
45 cloudwatch_logs {
46 enabled = true
47 log_group = aws_cloudwatch_log_group.msk_cluster_lg.name
48 }
49 s3 {
50 enabled = true
51 bucket = aws_s3_bucket.default_bucket.id
52 prefix = "logs/msk/cluster/"
53 }
54 }
55 }
56
57 tags = local.tags
58
59 depends_on = [aws_msk_configuration.msk_config]
60}
61
62resource "aws_msk_configuration" "msk_config" {
63 name = "${local.name}-msk-configuration"
64
65 kafka_versions = [local.msk.version]
66
67 server_properties = <<PROPERTIES
68 auto.create.topics.enable = true
69 delete.topic.enable = true
70 log.retention.ms = ${local.msk.log_retention_ms}
71 num.partitions = ${local.msk.num_partitions}
72 default.replication.factor = ${local.msk.default_replication_factor}
73 PROPERTIES
74}
Security Group
The security group of the MSK cluster allows all inbound traffic from itself and all outbound traffic into all IP addresses. These are necessary for Kafka connectors on MSK Connect that we will develop in later posts. Note that both the rules are too generous, however, we can limit the protocol and port ranges in production. Also, the security group has additional inbound rules that can be accessed on port 9098 from the security groups of the VPN server and Lambda producer function.
1# infra/msk.tf
2resource "aws_security_group" "msk" {
3 name = "${local.name}-msk-sg"
4 vpc_id = module.vpc.vpc_id
5
6 lifecycle {
7 create_before_destroy = true
8 }
9
10 tags = local.tags
11}
12
13resource "aws_security_group_rule" "msk_self_inbound_all" {
14 type = "ingress"
15 description = "Allow ingress from itself - required for MSK Connect"
16 security_group_id = aws_security_group.msk.id
17 protocol = "-1"
18 from_port = "0"
19 to_port = "0"
20 source_security_group_id = aws_security_group.msk.id
21}
22
23resource "aws_security_group_rule" "msk_self_outbound_all" {
24 type = "egress"
25 description = "Allow outbound all"
26 security_group_id = aws_security_group.msk.id
27 protocol = "-1"
28 from_port = "0"
29 to_port = "0"
30 cidr_blocks = ["0.0.0.0/0"]
31}
32
33resource "aws_security_group_rule" "msk_vpn_inbound" {
34 count = local.vpn.to_create ? 1 : 0
35 type = "ingress"
36 description = "Allow VPN access"
37 security_group_id = aws_security_group.msk.id
38 protocol = "tcp"
39 from_port = 9092
40 to_port = 9098
41 source_security_group_id = aws_security_group.vpn[0].id
42}
43
44resource "aws_security_group_rule" "msk_kafka_producer_inbound" {
45 count = local.producer.to_create ? 1 : 0
46 type = "ingress"
47 description = "lambda kafka producer access"
48 security_group_id = aws_security_group.msk.id
49 protocol = "tcp"
50 from_port = 9098
51 to_port = 9098
52 source_security_group_id = aws_security_group.kafka_producer[0].id
53}
Lambda Function
The Kafka producer Lambda function is deployed conditionally by a flag variable called producer_to_create. Once it is set to true, the function is created by the AWS Lambda Terraform module while referring to the associating configuration variables (local.producer.*).
1# infra/variables.tf
2variable "producer_to_create" {
3 description = "Flag to indicate whether to create Lambda Kafka producer"
4 type = bool
5 default = false
6}
7
8...
9
10locals {
11 ...
12 producer = {
13 to_create = var.producer_to_create
14 src_path = "../producer"
15 function_name = "kafka_producer"
16 handler = "app.lambda_function"
17 concurrency = 5
18 timeout = 90
19 memory_size = 128
20 runtime = "python3.8"
21 schedule_rate = "rate(1 minute)"
22 environment = {
23 topic_name = "taxi-rides"
24 max_run_sec = 60
25 }
26 }
27 ...
28}
29
30# infra/producer.tf
31module "kafka_producer" {
32 source = "terraform-aws-modules/lambda/aws"
33 version = ">=5.1.0, <6.0.0"
34
35 create = local.producer.to_create
36
37 function_name = local.producer.function_name
38 handler = local.producer.handler
39 runtime = local.producer.runtime
40 timeout = local.producer.timeout
41 memory_size = local.producer.memory_size
42 source_path = local.producer.src_path
43 vpc_subnet_ids = module.vpc.private_subnets
44 vpc_security_group_ids = local.producer.to_create ? [aws_security_group.kafka_producer[0].id] : null
45 attach_network_policy = true
46 attach_policies = true
47 policies = local.producer.to_create ? [aws_iam_policy.kafka_producer[0].arn] : null
48 number_of_policies = 1
49 environment_variables = {
50 BOOTSTRAP_SERVERS = aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
51 TOPIC_NAME = local.producer.environment.topic_name
52 MAX_RUN_SEC = local.producer.environment.max_run_sec
53 }
54
55 depends_on = [
56 aws_msk_cluster.msk_data_cluster
57 ]
58
59 tags = local.tags
60}
61
62resource "aws_lambda_function_event_invoke_config" "kafka_producer" {
63 count = local.producer.to_create ? 1 : 0
64
65 function_name = module.kafka_producer.lambda_function_name
66 maximum_retry_attempts = 0
67}
68
69resource "aws_lambda_permission" "allow_eventbridge" {
70 count = local.producer.to_create ? 1 : 0
71 statement_id = "InvokeLambdaFunction"
72 action = "lambda:InvokeFunction"
73 function_name = local.producer.function_name
74 principal = "events.amazonaws.com"
75 source_arn = module.eventbridge.eventbridge_rule_arns["crons"]
76
77 depends_on = [
78 module.eventbridge
79 ]
80}
IAM Permission
The producer Lambda function needs permission to send messages to the Kafka topic. The following IAM policy is added to the Lambda function as illustrated in this AWS documentation.
1# infra/producer.tf
2resource "aws_iam_policy" "kafka_producer" {
3 count = local.producer.to_create ? 1 : 0
4 name = "${local.producer.function_name}-msk-lambda-permission"
5
6 policy = jsonencode({
7 Version = "2012-10-17"
8 Statement = [
9 {
10 Sid = "PermissionOnCluster"
11 Action = [
12 "kafka-cluster:Connect",
13 "kafka-cluster:AlterCluster",
14 "kafka-cluster:DescribeCluster"
15 ]
16 Effect = "Allow"
17 Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.name}-msk-cluster/*"
18 },
19 {
20 Sid = "PermissionOnTopics"
21 Action = [
22 "kafka-cluster:*Topic*",
23 "kafka-cluster:WriteData",
24 "kafka-cluster:ReadData"
25 ]
26 Effect = "Allow"
27 Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.name}-msk-cluster/*"
28 },
29 {
30 Sid = "PermissionOnGroups"
31 Action = [
32 "kafka-cluster:AlterGroup",
33 "kafka-cluster:DescribeGroup"
34 ]
35 Effect = "Allow"
36 Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.name}-msk-cluster/*"
37 }
38 ]
39 })
40}
Lambda Security Group
We also need to add an outbound rule to the Lambda function’s security group so that it can access the MSK cluster.
1# infra/producer.tf
2resource "aws_security_group" "kafka_producer" {
3 count = local.producer.to_create ? 1 : 0
4
5 name = "${local.name}-lambda-sg"
6 vpc_id = module.vpc.vpc_id
7
8 egress {
9 from_port = 9098
10 to_port = 9098
11 protocol = "tcp"
12 cidr_blocks = ["0.0.0.0/0"]
13 }
14
15 lifecycle {
16 create_before_destroy = true
17 }
18
19 tags = local.tags
20}
Function Source
The TaxiRide class generates one or more taxi ride records by the _create _ method where random records are populated by the random module. The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. A Kafka message is made up of an ID as the key and a taxi ride record as the value. Both the key and value are serialised as JSON. Note that the stable version of the kafka-python package does not support the IAM authentication method. Therefore, we need to install the package from a forked repository as discussed in this GitHub issue.
1# producer/app.py
2import os
3import datetime
4import random
5import json
6import re
7import time
8import typing
9import dataclasses
10
11from kafka import KafkaProducer
12
13
14@dataclasses.dataclass
15class TaxiRide:
16 id: str
17 vendor_id: int
18 pickup_date: str
19 dropoff_date: str
20 passenger_count: int
21 pickup_longitude: str
22 pickup_latitude: str
23 dropoff_longitude: str
24 dropoff_latitude: str
25 store_and_fwd_flag: str
26 gc_distance: int
27 trip_duration: int
28 google_distance: int
29 google_duration: int
30
31 def asdict(self):
32 return dataclasses.asdict(self)
33
34 @classmethod
35 def auto(cls):
36 pickup_lon, pickup_lat = tuple(TaxiRide.get_latlon().split(","))
37 dropoff_lon, dropoff_lat = tuple(TaxiRide.get_latlon().split(","))
38 distance, duration = random.randint(1, 7), random.randint(8, 10000)
39 return cls(
40 id=f"id{random.randint(1665586, 8888888)}",
41 vendor_id=random.randint(1, 5),
42 pickup_date=datetime.datetime.now().isoformat(timespec="milliseconds"),
43 dropoff_date=(
44 datetime.datetime.now() + datetime.timedelta(minutes=random.randint(30, 100))
45 ).isoformat(timespec="milliseconds"),
46 passenger_count=random.randint(1, 9),
47 pickup_longitude=pickup_lon,
48 pickup_latitude=pickup_lat,
49 dropoff_longitude=dropoff_lon,
50 dropoff_latitude=dropoff_lat,
51 store_and_fwd_flag=["Y", "N"][random.randint(0, 1)],
52 gc_distance=distance,
53 trip_duration=duration,
54 google_distance=distance,
55 google_duration=duration,
56 )
57
58 @staticmethod
59 def create(num: int):
60 return [TaxiRide.auto() for _ in range(num)]
61
62 # fmt: off
63 @staticmethod
64 def get_latlon():
65 location_list = [
66 "-73.98174286,40.71915817", "-73.98508453,40.74716568", "-73.97333527,40.76407242", "-73.99310303,40.75263214",
67 "-73.98229218,40.75133133", "-73.96527863,40.80104065", "-73.97010803,40.75979996", "-73.99373627,40.74176025",
68 "-73.98544312,40.73571014", "-73.97686005,40.68337631", "-73.9697876,40.75758362", "-73.99397278,40.74086761",
69 "-74.00531769,40.72866058", "-73.99013519,40.74885178", "-73.9595108,40.76280975", "-73.99025726,40.73703384",
70 "-73.99495697,40.745121", "-73.93579865,40.70730972", "-73.99046326,40.75100708", "-73.9536438,40.77526093",
71 "-73.98226166,40.75159073", "-73.98831177,40.72318649", "-73.97222137,40.67683029", "-73.98626709,40.73276901",
72 "-73.97852325,40.78910065", "-73.97612,40.74908066", "-73.98240662,40.73148727", "-73.98776245,40.75037384",
73 "-73.97187042,40.75840378", "-73.87303925,40.77410507", "-73.9921875,40.73451996", "-73.98435974,40.74898529",
74 "-73.98092651,40.74196243", "-74.00701904,40.72573853", "-74.00798798,40.74022675", "-73.99419403,40.74555969",
75 "-73.97737885,40.75883865", "-73.97051239,40.79664993", "-73.97693634,40.7599144", "-73.99306488,40.73812866",
76 "-74.00775146,40.74528885", "-73.98532867,40.74198914", "-73.99037933,40.76152802", "-73.98442078,40.74978638",
77 "-73.99173737,40.75437927", "-73.96742249,40.78820801", "-73.97813416,40.72935867", "-73.97171021,40.75943375",
78 "-74.00737,40.7431221", "-73.99498749,40.75517654", "-73.91600037,40.74634933", "-73.99924469,40.72764587",
79 "-73.98488617,40.73621368", "-73.98627472,40.74737167",
80 ]
81 return location_list[random.randint(0, len(location_list) - 1)]
82
83
84# fmt: on
85class Producer:
86 def __init__(self, bootstrap_servers: list, topic: str):
87 self.bootstrap_servers = bootstrap_servers
88 self.topic = topic
89 self.producer = self.create()
90
91 def create(self):
92 kwargs = {
93 "bootstrap_servers": self.bootstrap_servers,
94 "value_serializer": lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
95 "key_serializer": lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
96 "api_version": (2, 8, 1),
97 }
98 if re.search("9098$", next(iter(self.bootstrap_servers))):
99 kwargs = {
100 **kwargs,
101 **{
102 "security_protocol": "SASL_SSL",
103 "sasl_mechanism": "AWS_MSK_IAM",
104 },
105 }
106 return KafkaProducer(**kwargs)
107
108 def send(self, items: typing.List[TaxiRide]):
109 for item in items:
110 self.producer.send(self.topic, key={"id": item.id}, value=item.asdict())
111 self.producer.flush()
112
113 def serialize(self, obj):
114 if isinstance(obj, datetime.datetime):
115 return obj.isoformat()
116 if isinstance(obj, datetime.date):
117 return str(obj)
118 return obj
119
120
121def lambda_function(event, context):
122 producer = Producer(
123 bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","), topic=os.environ["TOPIC_NAME"]
124 )
125 s = datetime.datetime.now()
126 total_records = 0
127 while True:
128 items = TaxiRide.create(10)
129 producer.send(items)
130 total_records += len(items)
131 print(f"sent {len(items)} messages")
132 elapsed_sec = (datetime.datetime.now() - s).seconds
133 if elapsed_sec > int(os.environ["MAX_RUN_SEC"]):
134 print(f"{total_records} records are sent in {elapsed_sec} seconds ...")
135 break
136 time.sleep(1)
A sample taxi ride record is shown below.
1{
2 "id": "id3464573",
3 "vendor_id": 5,
4 "pickup_date": "2023-10-13T01:59:05.422",
5 "dropoff_date": "2023-10-13T02:52:05.422",
6 "passenger_count": 9,
7 "pickup_longitude": "-73.97813416",
8 "pickup_latitude": "40.72935867",
9 "dropoff_longitude": "-73.91600037",
10 "dropoff_latitude": "40.74634933",
11 "store_and_fwd_flag": "Y",
12 "gc_distance": 3,
13 "trip_duration": 4731,
14 "google_distance": 3,
15 "google_duration": 4731
16}
EventBridge Rule
The AWS EventBridge Terraform module is used to create the EventBridge schedule rule and targets. Note that the rule named crons has a configurable number of targets (eg 5) and each target points to the same Lambda producer function. Therefore, we are able to generate test data using multiple Lambda functions based on the desired volume of messages.
1# infra/producer.tf
2module "eventbridge" {
3 source = "terraform-aws-modules/eventbridge/aws"
4 version = ">=2.3.0, <3.0.0"
5
6 create = local.producer.to_create
7 create_bus = false
8
9 rules = {
10 crons = {
11 description = "Kafka producer lambda schedule"
12 schedule_expression = local.producer.schedule_rate
13 }
14 }
15
16 targets = {
17 crons = [for i in range(local.producer.concurrency) : {
18 name = "lambda-target-${i}"
19 arn = module.kafka_producer.lambda_function_arn
20 }]
21 }
22
23 depends_on = [
24 module.kafka_producer
25 ]
26
27 tags = local.tags
28}
Deployment
The application can be deployed (as well as destroyed) using Terraform CLI as shown below. As the default value of producer_to_create is false, we need to set it to true in order to create the Lambda producer function.
1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true'
Once deployed, we can see that the schedule rule has 5 targets of the same Lambda function among others.
Monitor Topic
A Kafka management app can be a good companion for development as it helps monitor and manage resources on an easy-to-use user interface. We’ll use Kpow Community Edition in this post, which allows you to link a single Kafka cluster, Kafka connect server and schema registry. Note that the community edition is valid for 12 months and the licence can be requested on this page. Once requested, the licence details will be emailed, and they can be added as an environment file (env_file).
The app needs additional configurations in environment variables because the Kafka cluster on Amazon MSK is authenticated by IAM - see this page for details. The bootstrap server address can be found on AWS Console or executing the following Terraform command.
1$ terraform output -json | jq -r '.msk_bootstrap_brokers_sasl_iam.value'
Note that we need to specify the compose file name when starting it because the file name (compose-ui.yml) is different from the default file name (docker-compose.yml). We can run it by docker-compose -f compose-ui.yml up -d
and access on a browser via localhost:3000.
1# compose-ui.yml
2version: "3"
3
4services:
5 kpow:
6 image: factorhouse/kpow-ce:91.5.1
7 container_name: kpow
8 ports:
9 - "3000:3000"
10 networks:
11 - kafkanet
12 environment:
13 AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
14 AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
15 AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
16 BOOTSTRAP: $BOOTSTRAP_SERVERS
17 SECURITY_PROTOCOL: SASL_SSL
18 SASL_MECHANISM: AWS_MSK_IAM
19 SASL_JAAS_CONFIG: software.amazon.msk.auth.iam.IAMLoginModule required;
20 SASL_CLIENT_CALLBACK_HANDLER_CLASS: software.amazon.msk.auth.iam.IAMClientCallbackHandler
21 env_file: # https://kpow.io/get-started/#individual
22 - ./kpow.env
23
24networks:
25 kafkanet:
26 name: kafka-network
We can see the topic (taxi-rides) is created, and it has 5 partitions, which is the default number of partitions.
Also, we can inspect topic messages in the Data tab as shown below.
Summary
In this lab, we created a Kafka producer application using AWS Lambda, which sends fake taxi ride data into a Kafka topic on Amazon MSK. It was developed so that a configurable number of the producer Lambda function can be invoked by an Amazon EventBridge schedule rule. In this way, we are able to generate test data concurrently based on the desired volume of messages.
Comments