In this lab, we will create a Kafka producer application using AWS Lambda, which sends fake taxi ride data into a Kafka topic on Amazon MSK. A configurable number of the producer Lambda function will be invoked by an Amazon EventBridge schedule rule. In this way we are able to generate test data concurrently based on the desired volume of messages.

[Update 2023-11-06] Initially I planned to deploy Pyflink applications on Amazon Managed Service for Apache Flink, but I changed the plan to use a local Flink cluster deployed on Docker. The main reasons are

  1. It is not clear how to configure a Pyflink application for the managed service. For example, Apache Flink supports pluggable file systems and the required dependency (eg flink-s3-fs-hadoop-1.15.2.jar) should be placed under the plugins folder. However, the sample Pyflink applications from pyflink-getting-started and amazon-kinesis-data-analytics-blueprints either ignore the S3 jar file for deployment or package it together with other dependencies - none of them uses the S3 jar file as a plugin. I tried multiple different configurations, but all ended up with having an error whose code is CodeError.InvalidApplicationCode. I don’t have such an issue when I deploy the app on a local Flink cluster and I haven’t found a way to configure the app for the managed service as yet.
  2. The Pyflink app for Lab 4 requires the OpenSearch sink connector and the connector is available on 1.16.0+. However, the latest Flink version of the managed service is still 1.15.2 and the sink connector is not available on it. Normally the latest version of the managed service is behind two minor versions of the official release, but it seems to take a little longer to catch up at the moment - the version 1.18.0 was released a while ago.

Architecture

Fake taxi ride data is generated by multiple Kafka Lambda producer functions that are invoked by an EventBridge schedule rule. The schedule is set to run every minute and the associating rule has a configurable number (e.g. 5) of targets. Each target points to the same Lambda function. In this way we are able to generate test data using multiple Lambda functions based on the desired volume of messages.

Infrastructure

The infrastructure is created using Terraform and the source can be found in the GitHub repository of this post.

VPC and VPN

A VPC with 3 public and private subnets is created using the AWS VPC Terraform module (infra/vpc.tf). Also, a SoftEther VPN server is deployed in order to access the resources in the private subnets from the developer machine (infra/vpn.tf). The details about how to configure the VPN server can be found in this post.

MSK Cluster

An MSK cluster with 2 brokers is created. The broker nodes are deployed with the kafka.m5.large instance type in private subnets and IAM authentication is used for the client authentication method. Finally, additional server configurations are added such as enabling topic auto-creation/deletion, (default) number of partitions and default replication factor.

 1# infra/variables.tf
 2locals {
 3  ...
 4  msk = {
 5    version                    = "2.8.1"
 6    instance_size              = "kafka.m5.large"
 7    ebs_volume_size            = 20
 8    log_retention_ms           = 604800000 # 7 days
 9    number_of_broker_nodes     = 2
10    num_partitions             = 5
11    default_replication_factor = 2
12  }
13  ...
14}
15
16# infra/msk.tf
17resource "aws_msk_cluster" "msk_data_cluster" {
18  cluster_name           = "${local.name}-msk-cluster"
19  kafka_version          = local.msk.version
20  number_of_broker_nodes = local.msk.number_of_broker_nodes
21  configuration_info {
22    arn      = aws_msk_configuration.msk_config.arn
23    revision = aws_msk_configuration.msk_config.latest_revision
24  }
25
26  broker_node_group_info {
27    instance_type   = local.msk.instance_size
28    client_subnets  = slice(module.vpc.private_subnets, 0, local.msk.number_of_broker_nodes)
29    security_groups = [aws_security_group.msk.id]
30    storage_info {
31      ebs_storage_info {
32        volume_size = local.msk.ebs_volume_size
33      }
34    }
35  }
36
37  client_authentication {
38    sasl {
39      iam = true
40    }
41  }
42
43  logging_info {
44    broker_logs {
45      cloudwatch_logs {
46        enabled   = true
47        log_group = aws_cloudwatch_log_group.msk_cluster_lg.name
48      }
49      s3 {
50        enabled = true
51        bucket  = aws_s3_bucket.default_bucket.id
52        prefix  = "logs/msk/cluster/"
53      }
54    }
55  }
56
57  tags = local.tags
58
59  depends_on = [aws_msk_configuration.msk_config]
60}
61
62resource "aws_msk_configuration" "msk_config" {
63  name = "${local.name}-msk-configuration"
64
65  kafka_versions = [local.msk.version]
66
67  server_properties = <<PROPERTIES
68    auto.create.topics.enable = true
69    delete.topic.enable = true
70    log.retention.ms = ${local.msk.log_retention_ms}
71    num.partitions = ${local.msk.num_partitions}
72    default.replication.factor = ${local.msk.default_replication_factor}
73  PROPERTIES
74}

Security Group

The security group of the MSK cluster allows all inbound traffic from itself and all outbound traffic into all IP addresses. These are necessary for Kafka connectors on MSK Connect that we will develop in later posts. Note that both the rules are too generous, however, we can limit the protocol and port ranges in production. Also, the security group has additional inbound rules that can be accessed on port 9098 from the security groups of the VPN server and Lambda producer function.

 1# infra/msk.tf
 2resource "aws_security_group" "msk" {
 3  name   = "${local.name}-msk-sg"
 4  vpc_id = module.vpc.vpc_id
 5
 6  lifecycle {
 7    create_before_destroy = true
 8  }
 9
10  tags = local.tags
11}
12
13resource "aws_security_group_rule" "msk_self_inbound_all" {
14  type                     = "ingress"
15  description              = "Allow ingress from itself - required for MSK Connect"
16  security_group_id        = aws_security_group.msk.id
17  protocol                 = "-1"
18  from_port                = "0"
19  to_port                  = "0"
20  source_security_group_id = aws_security_group.msk.id
21}
22
23resource "aws_security_group_rule" "msk_self_outbound_all" {
24  type              = "egress"
25  description       = "Allow outbound all"
26  security_group_id = aws_security_group.msk.id
27  protocol          = "-1"
28  from_port         = "0"
29  to_port           = "0"
30  cidr_blocks       = ["0.0.0.0/0"]
31}
32
33resource "aws_security_group_rule" "msk_vpn_inbound" {
34  count                    = local.vpn.to_create ? 1 : 0
35  type                     = "ingress"
36  description              = "Allow VPN access"
37  security_group_id        = aws_security_group.msk.id
38  protocol                 = "tcp"
39  from_port                = 9092
40  to_port                  = 9098
41  source_security_group_id = aws_security_group.vpn[0].id
42}
43
44resource "aws_security_group_rule" "msk_kafka_producer_inbound" {
45  count                    = local.producer.to_create ? 1 : 0
46  type                     = "ingress"
47  description              = "lambda kafka producer access"
48  security_group_id        = aws_security_group.msk.id
49  protocol                 = "tcp"
50  from_port                = 9098
51  to_port                  = 9098
52  source_security_group_id = aws_security_group.kafka_producer[0].id
53}

Lambda Function

The Kafka producer Lambda function is deployed conditionally by a flag variable called producer_to_create. Once it is set to true, the function is created by the AWS Lambda Terraform module while referring to the associating configuration variables (local.producer.*).

 1# infra/variables.tf
 2variable "producer_to_create" {
 3  description = "Flag to indicate whether to create Lambda Kafka producer"
 4  type        = bool
 5  default     = false
 6}
 7
 8...
 9
10locals {
11  ...
12  producer = {
13    to_create     = var.producer_to_create
14    src_path      = "../producer"
15    function_name = "kafka_producer"
16    handler       = "app.lambda_function"
17    concurrency   = 5
18    timeout       = 90
19    memory_size   = 128
20    runtime       = "python3.8"
21    schedule_rate = "rate(1 minute)"
22    environment = {
23      topic_name  = "taxi-rides"
24      max_run_sec = 60
25    }
26  }
27  ...
28}
29
30# infra/producer.tf
31module "kafka_producer" {
32  source  = "terraform-aws-modules/lambda/aws"
33  version = ">=5.1.0, <6.0.0"
34
35  create = local.producer.to_create
36
37  function_name          = local.producer.function_name
38  handler                = local.producer.handler
39  runtime                = local.producer.runtime
40  timeout                = local.producer.timeout
41  memory_size            = local.producer.memory_size
42  source_path            = local.producer.src_path
43  vpc_subnet_ids         = module.vpc.private_subnets
44  vpc_security_group_ids = local.producer.to_create ? [aws_security_group.kafka_producer[0].id] : null
45  attach_network_policy  = true
46  attach_policies        = true
47  policies               = local.producer.to_create ? [aws_iam_policy.kafka_producer[0].arn] : null
48  number_of_policies     = 1
49  environment_variables = {
50    BOOTSTRAP_SERVERS = aws_msk_cluster.msk_data_cluster.bootstrap_brokers_sasl_iam
51    TOPIC_NAME        = local.producer.environment.topic_name
52    MAX_RUN_SEC       = local.producer.environment.max_run_sec
53  }
54
55  depends_on = [
56    aws_msk_cluster.msk_data_cluster
57  ]
58
59  tags = local.tags
60}
61
62resource "aws_lambda_function_event_invoke_config" "kafka_producer" {
63  count = local.producer.to_create ? 1 : 0
64
65  function_name          = module.kafka_producer.lambda_function_name
66  maximum_retry_attempts = 0
67}
68
69resource "aws_lambda_permission" "allow_eventbridge" {
70  count         = local.producer.to_create ? 1 : 0
71  statement_id  = "InvokeLambdaFunction"
72  action        = "lambda:InvokeFunction"
73  function_name = local.producer.function_name
74  principal     = "events.amazonaws.com"
75  source_arn    = module.eventbridge.eventbridge_rule_arns["crons"]
76
77  depends_on = [
78    module.eventbridge
79  ]
80}

IAM Permission

The producer Lambda function needs permission to send messages to the Kafka topic. The following IAM policy is added to the Lambda function as illustrated in this AWS documentation.

 1# infra/producer.tf
 2resource "aws_iam_policy" "kafka_producer" {
 3  count = local.producer.to_create ? 1 : 0
 4  name  = "${local.producer.function_name}-msk-lambda-permission"
 5
 6  policy = jsonencode({
 7    Version = "2012-10-17"
 8    Statement = [
 9      {
10        Sid = "PermissionOnCluster"
11        Action = [
12          "kafka-cluster:Connect",
13          "kafka-cluster:AlterCluster",
14          "kafka-cluster:DescribeCluster"
15        ]
16        Effect   = "Allow"
17        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:cluster/${local.name}-msk-cluster/*"
18      },
19      {
20        Sid = "PermissionOnTopics"
21        Action = [
22          "kafka-cluster:*Topic*",
23          "kafka-cluster:WriteData",
24          "kafka-cluster:ReadData"
25        ]
26        Effect   = "Allow"
27        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:topic/${local.name}-msk-cluster/*"
28      },
29      {
30        Sid = "PermissionOnGroups"
31        Action = [
32          "kafka-cluster:AlterGroup",
33          "kafka-cluster:DescribeGroup"
34        ]
35        Effect   = "Allow"
36        Resource = "arn:aws:kafka:${local.region}:${data.aws_caller_identity.current.account_id}:group/${local.name}-msk-cluster/*"
37      }
38    ]
39  })
40}

Lambda Security Group

We also need to add an outbound rule to the Lambda function’s security group so that it can access the MSK cluster.

 1# infra/producer.tf
 2resource "aws_security_group" "kafka_producer" {
 3  count = local.producer.to_create ? 1 : 0
 4
 5  name   = "${local.name}-lambda-sg"
 6  vpc_id = module.vpc.vpc_id
 7
 8  egress {
 9    from_port   = 9098
10    to_port     = 9098
11    protocol    = "tcp"
12    cidr_blocks = ["0.0.0.0/0"]
13  }
14
15  lifecycle {
16    create_before_destroy = true
17  }
18
19  tags = local.tags
20}

Function Source

The TaxiRide class generates one or more taxi ride records by the _create _ method where random records are populated by the random module. The Lambda function sends 100 records at a time followed by sleeping for 1 second. It repeats until it reaches MAX_RUN_SEC (e.g. 60) environment variable value. A Kafka message is made up of an ID as the key and a taxi ride record as the value. Both the key and value are serialised as JSON. Note that the stable version of the kafka-python package does not support the IAM authentication method. Therefore, we need to install the package from a forked repository as discussed in this GitHub issue.

  1# producer/app.py
  2import os
  3import datetime
  4import random
  5import json
  6import re
  7import time
  8import typing
  9import dataclasses
 10
 11from kafka import KafkaProducer
 12
 13
 14@dataclasses.dataclass
 15class TaxiRide:
 16    id: str
 17    vendor_id: int
 18    pickup_date: str
 19    dropoff_date: str
 20    passenger_count: int
 21    pickup_longitude: str
 22    pickup_latitude: str
 23    dropoff_longitude: str
 24    dropoff_latitude: str
 25    store_and_fwd_flag: str
 26    gc_distance: int
 27    trip_duration: int
 28    google_distance: int
 29    google_duration: int
 30
 31    def asdict(self):
 32        return dataclasses.asdict(self)
 33
 34    @classmethod
 35    def auto(cls):
 36        pickup_lon, pickup_lat = tuple(TaxiRide.get_latlon().split(","))
 37        dropoff_lon, dropoff_lat = tuple(TaxiRide.get_latlon().split(","))
 38        distance, duration = random.randint(1, 7), random.randint(8, 10000)
 39        return cls(
 40            id=f"id{random.randint(1665586, 8888888)}",
 41            vendor_id=random.randint(1, 5),
 42            pickup_date=datetime.datetime.now().isoformat(timespec="milliseconds"),
 43            dropoff_date=(
 44                datetime.datetime.now() + datetime.timedelta(minutes=random.randint(30, 100))
 45            ).isoformat(timespec="milliseconds"),
 46            passenger_count=random.randint(1, 9),
 47            pickup_longitude=pickup_lon,
 48            pickup_latitude=pickup_lat,
 49            dropoff_longitude=dropoff_lon,
 50            dropoff_latitude=dropoff_lat,
 51            store_and_fwd_flag=["Y", "N"][random.randint(0, 1)],
 52            gc_distance=distance,
 53            trip_duration=duration,
 54            google_distance=distance,
 55            google_duration=duration,
 56        )
 57
 58    @staticmethod
 59    def create(num: int):
 60        return [TaxiRide.auto() for _ in range(num)]
 61
 62    # fmt: off
 63    @staticmethod
 64    def get_latlon():
 65        location_list = [
 66            "-73.98174286,40.71915817", "-73.98508453,40.74716568", "-73.97333527,40.76407242", "-73.99310303,40.75263214",
 67            "-73.98229218,40.75133133", "-73.96527863,40.80104065", "-73.97010803,40.75979996", "-73.99373627,40.74176025",
 68            "-73.98544312,40.73571014", "-73.97686005,40.68337631", "-73.9697876,40.75758362", "-73.99397278,40.74086761",
 69            "-74.00531769,40.72866058", "-73.99013519,40.74885178", "-73.9595108,40.76280975", "-73.99025726,40.73703384",
 70            "-73.99495697,40.745121", "-73.93579865,40.70730972", "-73.99046326,40.75100708", "-73.9536438,40.77526093",
 71            "-73.98226166,40.75159073", "-73.98831177,40.72318649", "-73.97222137,40.67683029", "-73.98626709,40.73276901",
 72            "-73.97852325,40.78910065", "-73.97612,40.74908066", "-73.98240662,40.73148727", "-73.98776245,40.75037384",
 73            "-73.97187042,40.75840378", "-73.87303925,40.77410507", "-73.9921875,40.73451996", "-73.98435974,40.74898529",
 74            "-73.98092651,40.74196243", "-74.00701904,40.72573853", "-74.00798798,40.74022675", "-73.99419403,40.74555969",
 75            "-73.97737885,40.75883865", "-73.97051239,40.79664993", "-73.97693634,40.7599144", "-73.99306488,40.73812866",
 76            "-74.00775146,40.74528885", "-73.98532867,40.74198914", "-73.99037933,40.76152802", "-73.98442078,40.74978638",
 77            "-73.99173737,40.75437927", "-73.96742249,40.78820801", "-73.97813416,40.72935867", "-73.97171021,40.75943375",
 78            "-74.00737,40.7431221", "-73.99498749,40.75517654", "-73.91600037,40.74634933", "-73.99924469,40.72764587",
 79            "-73.98488617,40.73621368", "-73.98627472,40.74737167",
 80        ]
 81        return location_list[random.randint(0, len(location_list) - 1)]
 82
 83
 84# fmt: on
 85class Producer:
 86    def __init__(self, bootstrap_servers: list, topic: str):
 87        self.bootstrap_servers = bootstrap_servers
 88        self.topic = topic
 89        self.producer = self.create()
 90
 91    def create(self):
 92        kwargs = {
 93            "bootstrap_servers": self.bootstrap_servers,
 94            "value_serializer": lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
 95            "key_serializer": lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
 96            "api_version": (2, 8, 1),
 97        }
 98        if re.search("9098$", next(iter(self.bootstrap_servers))):
 99            kwargs = {
100                **kwargs,
101                **{
102                    "security_protocol": "SASL_SSL",
103                    "sasl_mechanism": "AWS_MSK_IAM",
104                },
105            }
106        return KafkaProducer(**kwargs)
107
108    def send(self, items: typing.List[TaxiRide]):
109        for item in items:
110            self.producer.send(self.topic, key={"id": item.id}, value=item.asdict())
111        self.producer.flush()
112
113    def serialize(self, obj):
114        if isinstance(obj, datetime.datetime):
115            return obj.isoformat()
116        if isinstance(obj, datetime.date):
117            return str(obj)
118        return obj
119
120
121def lambda_function(event, context):
122    producer = Producer(
123        bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","), topic=os.environ["TOPIC_NAME"]
124    )
125    s = datetime.datetime.now()
126    total_records = 0
127    while True:
128        items = TaxiRide.create(10)
129        producer.send(items)
130        total_records += len(items)
131        print(f"sent {len(items)} messages")
132        elapsed_sec = (datetime.datetime.now() - s).seconds
133        if elapsed_sec > int(os.environ["MAX_RUN_SEC"]):
134            print(f"{total_records} records are sent in {elapsed_sec} seconds ...")
135            break
136        time.sleep(1)

A sample taxi ride record is shown below.

 1{
 2	"id": "id3464573",
 3	"vendor_id": 5,
 4	"pickup_date": "2023-10-13T01:59:05.422",
 5	"dropoff_date": "2023-10-13T02:52:05.422",
 6	"passenger_count": 9,
 7	"pickup_longitude": "-73.97813416",
 8	"pickup_latitude": "40.72935867",
 9	"dropoff_longitude": "-73.91600037",
10	"dropoff_latitude": "40.74634933",
11	"store_and_fwd_flag": "Y",
12	"gc_distance": 3,
13	"trip_duration": 4731,
14	"google_distance": 3,
15	"google_duration": 4731
16}

EventBridge Rule

The AWS EventBridge Terraform module is used to create the EventBridge schedule rule and targets. Note that the rule named crons has a configurable number of targets (eg 5) and each target points to the same Lambda producer function. Therefore, we are able to generate test data using multiple Lambda functions based on the desired volume of messages.

 1# infra/producer.tf
 2module "eventbridge" {
 3  source  = "terraform-aws-modules/eventbridge/aws"
 4  version = ">=2.3.0, <3.0.0"
 5
 6  create     = local.producer.to_create
 7  create_bus = false
 8
 9  rules = {
10    crons = {
11      description         = "Kafka producer lambda schedule"
12      schedule_expression = local.producer.schedule_rate
13    }
14  }
15
16  targets = {
17    crons = [for i in range(local.producer.concurrency) : {
18      name = "lambda-target-${i}"
19      arn  = module.kafka_producer.lambda_function_arn
20    }]
21  }
22
23  depends_on = [
24    module.kafka_producer
25  ]
26
27  tags = local.tags
28}

Deployment

The application can be deployed (as well as destroyed) using Terraform CLI as shown below. As the default value of producer_to_create is false, we need to set it to true in order to create the Lambda producer function.

1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true'

Once deployed, we can see that the schedule rule has 5 targets of the same Lambda function among others.

Monitor Topic

A Kafka management app can be a good companion for development as it helps monitor and manage resources on an easy-to-use user interface. We’ll use Kpow Community Edition in this post, which allows you to link a single Kafka cluster, Kafka connect server and schema registry. Note that the community edition is valid for 12 months and the licence can be requested on this page. Once requested, the licence details will be emailed, and they can be added as an environment file (env_file).

The app needs additional configurations in environment variables because the Kafka cluster on Amazon MSK is authenticated by IAM - see this page for details. The bootstrap server address can be found on AWS Console or executing the following Terraform command.

1$ terraform output -json | jq -r '.msk_bootstrap_brokers_sasl_iam.value'

Note that we need to specify the compose file name when starting it because the file name (compose-ui.yml) is different from the default file name (docker-compose.yml). We can run it by docker-compose -f compose-ui.yml up -d and access on a browser via localhost:3000.

 1# compose-ui.yml
 2version: "3"
 3
 4services:
 5  kpow:
 6    image: factorhouse/kpow-ce:91.5.1
 7    container_name: kpow
 8    ports:
 9      - "3000:3000"
10    networks:
11      - kafkanet
12    environment:
13      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
14      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
15      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
16      BOOTSTRAP: $BOOTSTRAP_SERVERS
17      SECURITY_PROTOCOL: SASL_SSL
18      SASL_MECHANISM: AWS_MSK_IAM
19      SASL_JAAS_CONFIG: software.amazon.msk.auth.iam.IAMLoginModule required;
20      SASL_CLIENT_CALLBACK_HANDLER_CLASS: software.amazon.msk.auth.iam.IAMClientCallbackHandler
21    env_file: # https://kpow.io/get-started/#individual
22      - ./kpow.env
23
24networks:
25  kafkanet:
26    name: kafka-network

We can see the topic (taxi-rides) is created, and it has 5 partitions, which is the default number of partitions.

Also, we can inspect topic messages in the Data tab as shown below.

Summary

In this lab, we created a Kafka producer application using AWS Lambda, which sends fake taxi ride data into a Kafka topic on Amazon MSK. It was developed so that a configurable number of the producer Lambda function can be invoked by an Amazon EventBridge schedule rule. In this way, we are able to generate test data concurrently based on the desired volume of messages.