In Part 4, we developed Kafka producer and consumer applications using the kafka-python package. The Kafka messages are serialized as Json, but are not associated with a schema as there was not an integrated schema registry. Later we discussed how producers and consumers to Kafka topics can use schemas to ensure data consistency and compatibility as schemas evolve in Part 5. In this post, I’ll demonstrate how to enhance the existing applications by integrating AWS Glue Schema Registry.
- Part 1 Cluster Setup
- Part 2 Management App
- Part 3 Kafka Connect
- Part 4 Producer and Consumer
- Part 5 Glue Schema Registry
- Part 6 Kafka Connect with Glue Schema Registry
- Part 7 Producer and Consumer with Glue Schema Registry (this post)
- Part 8 SSL Encryption
- Part 9 SSL Authentication
- Part 10 SASL Authentication
- Part 11 Kafka Authorization
Producer
Fake order data is generated using the Faker package and the dataclasses_avroschema package is used to automatically generate the Avro schema according to its attributes. A mixin class called InjectCompatMixin is injected into the Order class, which specifies a schema compatibility mode into the generated schema. The auto()
class method is used to generate an order record by instantiating the class.
The aws-glue-schema-registry package is used serialize order records. It provides the KafkaSerializer class that validates, registers and serializes the relevant records. It supports Json and Avro schemas, and we can add it to the value_serializer argument of the KafkaProducer class. By default, the schemas are named as <topic>-key
and <topic>-value
and it can be changed by updating the schema_naming_strategy argument. Note that, when sending a message, the value should be a tuple of data and schema.
The producer application can be run simply by python producer.py
. Note that, as the producer runs outside the Docker network, the host name of the external listener (localhost:29092
) should be used as the bootstrap server address. The source can be found in the GitHub repository of this post.
1# kafka-pocs/kafka-dev-with-docker/part-07/producer.py
2import os
3import datetime
4import time
5import json
6import typing
7import logging
8import dataclasses
9import enum
10
11from faker import Faker
12from dataclasses_avroschema import AvroModel
13import boto3
14import botocore.exceptions
15from kafka import KafkaProducer
16from aws_schema_registry import SchemaRegistryClient
17from aws_schema_registry.avro import AvroSchema
18from aws_schema_registry.adapter.kafka import KafkaSerializer
19
20logging.basicConfig(level=logging.INFO)
21
22
23class Compatibility(enum.Enum):
24 NONE = "NONE"
25 DISABLED = "DISABLED"
26 BACKWARD = "BACKWARD"
27 BACKWARD_ALL = "BACKWARD_ALL"
28 FORWARD = "FORWARD"
29 FORWARD_ALL = "FORWARD_ALL"
30 FULL = "FULL"
31 FULL_ALL = "FULL_ALL"
32
33
34class InjectCompatMixin:
35 @classmethod
36 def updated_avro_schema_to_python(cls, compat: Compatibility = Compatibility.BACKWARD):
37 schema = cls.avro_schema_to_python()
38 schema["compatibility"] = compat.value
39 return schema
40
41 @classmethod
42 def updated_avro_schema(cls, compat: Compatibility = Compatibility.BACKWARD):
43 schema = cls.updated_avro_schema_to_python(compat)
44 return json.dumps(schema)
45
46
47@dataclasses.dataclass
48class OrderItem(AvroModel):
49 product_id: int
50 quantity: int
51
52
53@dataclasses.dataclass
54class Order(AvroModel, InjectCompatMixin):
55 "Online fake order item"
56 order_id: str
57 ordered_at: datetime.datetime
58 user_id: str
59 order_items: typing.List[OrderItem]
60
61 class Meta:
62 namespace = "Order V1"
63
64 def asdict(self):
65 return dataclasses.asdict(self)
66
67 @classmethod
68 def auto(cls, fake: Faker = Faker()):
69 user_id = str(fake.random_int(1, 100)).zfill(3)
70 order_items = [
71 OrderItem(fake.random_int(1, 1000), fake.random_int(1, 10))
72 for _ in range(fake.random_int(1, 4))
73 ]
74 return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
75
76 def create(self, num: int):
77 return [self.auto() for _ in range(num)]
78
79
80class Producer:
81 def __init__(self, bootstrap_servers: list, topic: str, registry: str):
82 self.bootstrap_servers = bootstrap_servers
83 self.topic = topic
84 self.registry = registry
85 self.glue_client = boto3.client(
86 "glue", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-2")
87 )
88 self.producer = self.create()
89
90 @property
91 def serializer(self):
92 client = SchemaRegistryClient(self.glue_client, registry_name=self.registry)
93 return KafkaSerializer(client)
94
95 def create(self):
96 return KafkaProducer(
97 bootstrap_servers=self.bootstrap_servers,
98 key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
99 value_serializer=self.serializer,
100 )
101
102 def send(self, orders: typing.List[Order], schema: AvroSchema):
103 if not self.check_registry():
104 print(f"registry not found, create {self.registry}")
105 self.create_registry()
106
107 for order in orders:
108 try:
109 self.producer.send(
110 self.topic, key={"order_id": order.order_id}, value=(order.asdict(), schema)
111 )
112 except Exception as e:
113 raise RuntimeError("fails to send a message") from e
114 self.producer.flush()
115
116 def serialize(self, obj):
117 if isinstance(obj, datetime.datetime):
118 return obj.isoformat()
119 if isinstance(obj, datetime.date):
120 return str(obj)
121 return obj
122
123 def check_registry(self):
124 try:
125 self.glue_client.get_registry(RegistryId={"RegistryName": self.registry})
126 return True
127 except botocore.exceptions.ClientError as e:
128 if e.response["Error"]["Code"] == "EntityNotFoundException":
129 return False
130 else:
131 raise e
132
133 def create_registry(self):
134 try:
135 self.glue_client.create_registry(RegistryName=self.registry)
136 return True
137 except botocore.exceptions.ClientError as e:
138 if e.response["Error"]["Code"] == "AlreadyExistsException":
139 return True
140 else:
141 raise e
142
143
144if __name__ == "__main__":
145 producer = Producer(
146 bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
147 topic=os.getenv("TOPIC_NAME", "orders"),
148 registry=os.getenv("REGISTRY_NAME", "online-order"),
149 )
150 max_run = int(os.getenv("MAX_RUN", "-1"))
151 logging.info(f"max run - {max_run}")
152 current_run = 0
153 while True:
154 current_run += 1
155 logging.info(f"current run - {current_run}")
156 if current_run > max_run and max_run >= 0:
157 logging.info(f"exceeds max run, finish")
158 producer.producer.close()
159 break
160 orders = Order.auto().create(1)
161 schema = AvroSchema(Order.updated_avro_schema(Compatibility.BACKWARD))
162 producer.send(Order.auto().create(100), schema)
163 time.sleep(1)
The generated schema of the Order class can be found below.
1{
2 "doc": "Online fake order item",
3 "namespace": "Order V1",
4 "name": "Order",
5 "compatibility": "BACKWARD",
6 "type": "record",
7 "fields": [
8 {
9 "name": "order_id",
10 "type": "string"
11 },
12 {
13 "name": "ordered_at",
14 "type": {
15 "type": "long",
16 "logicalType": "timestamp-millis"
17 }
18 },
19 {
20 "name": "user_id",
21 "type": "string"
22 },
23 {
24 "name": "order_items",
25 "type": {
26 "type": "array",
27 "items": {
28 "type": "record",
29 "name": "OrderItem",
30 "fields": [
31 {
32 "name": "product_id",
33 "type": "long"
34 },
35 {
36 "name": "quantity",
37 "type": "long"
38 }
39 ]
40 },
41 "name": "order_item"
42 }
43 }
44 ]
45}
Below shows an example order record.
1{
2 "order_id": "00328584-7db3-4cfb-a5b7-de2c7eed7f43",
3 "ordered_at": "2023-05-23T08:30:52.461000",
4 "user_id": "010",
5 "order_items": [
6 {
7 "product_id": 213,
8 "quantity": 5
9 },
10 {
11 "product_id": 486,
12 "quantity": 3
13 }
14 ]
15}
Consumer
The Consumer class instantiates the KafkaConsumer class in the create method. The main consumer configuration values are provided by the constructor arguments: Kafka bootstrap server addresses (bootstrap_servers), topic names (topics) and consumer group ID (group_id). Note that the aws-glue-schema-registry package provides the KafkaDeserializer class that deserializes messages according to the corresponding schema version, and we should use it as the value_deserializer. The process()
method of the class polls messages and logs details of consumer records.
1# kafka-pocs/kafka-dev-with-docker/part-07/consumer.py
2import os
3import time
4import logging
5
6from kafka import KafkaConsumer
7from kafka.errors import KafkaError
8import boto3
9from aws_schema_registry import SchemaRegistryClient, DataAndSchema
10from aws_schema_registry.adapter.kafka import KafkaDeserializer
11
12logging.basicConfig(level=logging.INFO)
13
14
15class Consumer:
16 def __init__(self, bootstrap_servers: list, topics: list, group_id: str, registry: str) -> None:
17 self.bootstrap_servers = bootstrap_servers
18 self.topics = topics
19 self.group_id = group_id
20 self.registry = registry
21 self.glue_client = boto3.client(
22 "glue", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-2")
23 )
24 self.consumer = self.create()
25
26 @property
27 def deserializer(self):
28 client = SchemaRegistryClient(self.glue_client, registry_name=self.registry)
29 return KafkaDeserializer(client)
30
31 def create(self):
32 return KafkaConsumer(
33 *self.topics,
34 bootstrap_servers=self.bootstrap_servers,
35 auto_offset_reset="earliest",
36 enable_auto_commit=True,
37 group_id=self.group_id,
38 key_deserializer=lambda v: v.decode("utf-8"),
39 value_deserializer=self.deserializer,
40 )
41
42 def process(self):
43 try:
44 while True:
45 msg = self.consumer.poll(timeout_ms=1000)
46 if msg is None:
47 continue
48 self.print_info(msg)
49 time.sleep(1)
50 except KafkaError as error:
51 logging.error(error)
52
53 def print_info(self, msg: dict):
54 for t, v in msg.items():
55 for r in v:
56 value: DataAndSchema = r.value
57 logging.info(
58 f"key={r.key}, value={value.data}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
59 )
60
61
62if __name__ == "__main__":
63 consumer = Consumer(
64 bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
65 topics=os.getenv("TOPIC_NAME", "orders").split(","),
66 group_id=os.getenv("GROUP_ID", "orders-group"),
67 registry=os.getenv("REGISTRY_NAME", "online-order"),
68 )
69 consumer.process()
Consumer Services
The default number of partitions is set to be 3 in the compose-kafka.yml. A docker compose file is created for the consumer in order to deploy multiple instances of the app using the scale option. As the service uses the same docker network (kafkanet), we can take the service names of the brokers (e.g. kafka-0) on port 9092. Once started, it installs required packages and starts the consumer.
1# kafka-pocs/kafka-dev-with-docker/part-07/compose-consumer.yml
2version: "3.5"
3
4services:
5 app:
6 image: bitnami/python:3.9
7 command: "sh -c 'pip install -r requirements.txt && python consumer.py'"
8 networks:
9 - kafkanet
10 environment:
11 AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
12 AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
13 AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
14 BOOTSTRAP_SERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
15 TOPIC_NAME: orders
16 GROUP_ID: orders-group
17 REGISTRY_NAME: online-order
18 TZ: Australia/Sydney
19 volumes:
20 - ./consumer.py:/app/consumer.py
21 - ./requirements.txt:/app/requirements.txt
22
23networks:
24 kafkanet:
25 external: true
26 name: kafka-network
Kafka Management App
We should configure additional details in environment variables in order to integrate Glue Schema Registry. While both apps provide serializers/deserializers, kpow supports to manage schemas to some extent as well.
For kafka-ui, we can add one or more serialization plugins. I added the Glue registry serializer as a plugin and named it online-order. It requires the plugin binary file path, class name, registry name and AWS region name. Another key configuration values are the key and value schema templates values, which are used for finding schema names. Only the value schema template is updated as it is different from the default value. Note that the template values are applicable for producing messages on the UI. Therefore, we can leave them commented out if we don’t want to produce messages on it. Finally, the Glue registry serializer binary should be downloaded as it is volume-mapped in the compose file. It can be downloaded from the project repository - see download.sh.
The configuration of kpow is simpler as it only requires the registry ARN and AWS region. Note that the app fails to start if the registry doesn’t exit. I created the registry named online-order before starting it.
1# /kafka-dev-with-docker/part-07/compose-ui.yml
2version: "3.5"
3
4services:
5 kafka-ui:
6 image: provectuslabs/kafka-ui:master
7 container_name: kafka-ui
8 ports:
9 - "8080:8080"
10 networks:
11 - kafkanet
12 environment:
13 AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
14 AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
15 AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
16 # kafka cluster
17 KAFKA_CLUSTERS_0_NAME: local
18 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
19 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
20 # glue schema registry serde
21 KAFKA_CLUSTERS_0_SERDE_0_NAME: online-order
22 KAFKA_CLUSTERS_0_SERDE_0_FILEPATH: /glue-serde/kafkaui-glue-serde-v1.0.3-jar-with-dependencies.jar
23 KAFKA_CLUSTERS_0_SERDE_0_CLASSNAME: com.provectus.kafka.ui.serdes.glue.GlueSerde
24 KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_REGION: $AWS_DEFAULT_REGION #required
25 KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_REGISTRY: online-order #required, name of Glue Schema Registry
26 # template that will be used to find schema name for topic key. Optional, default is null (not set).
27 # KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_KEYSCHEMANAMETEMPLATE: "%s-key"
28 # template that will be used to find schema name for topic value. Optional, default is '%s'
29 KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_VALUESCHEMANAMETEMPLATE: "%s-value"
30 volumes:
31 - ./kafkaui-glue-serde-v1.0.3-jar-with-dependencies.jar:/glue-serde/kafkaui-glue-serde-v1.0.3-jar-with-dependencies.jar
32 kpow:
33 image: factorhouse/kpow-ce:91.2.1
34 container_name: kpow
35 ports:
36 - "3000:3000"
37 networks:
38 - kafkanet
39 environment:
40 AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
41 AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
42 AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
43 # kafka cluster
44 BOOTSTRAP: kafka-0:9092,kafka-1:9092,kafka-2:9092
45 # glue schema registry
46 SCHEMA_REGISTRY_ARN: $SCHEMA_REGISTRY_ARN
47 SCHEMA_REGISTRY_REGION: $AWS_DEFAULT_REGION
48
49networks:
50 kafkanet:
51 external: true
52 name: kafka-network
Start Applications
We can run the Kafka cluster by docker-compose -f compose-kafka.yml up -d
and the producer by python producer.py
. As mentioned earlier, we’ll deploy 3 instances of the consumer, and they can be deployed with the scale option as shown below. We can check the running consumer instances using the ps command of Docker Compose.
1$ docker-compose -f compose-consumer.yml up -d --scale app=3
2$ docker-compose -f compose-consumer.yml ps
3 Name Command State Ports
4-----------------------------------------------------------------
5part-07_app_1 sh -c pip install -r requi ... Up 8000/tcp
6part-07_app_2 sh -c pip install -r requi ... Up 8000/tcp
7part-07_app_3 sh -c pip install -r requi ... Up 8000/tcp
Each instance of the consumer subscribes to its own topic partition, and we can check that in container logs. Below shows the last 10 log entries of one of the instances. It shows it polls messages from partition 0 only.
1$ docker logs -f --tail 10 part-07_app_1
2
3INFO:root:key={"order_id": "91db5fac-8e6b-46d5-a8e1-3df911fa9c60"}, value={'order_id': '91db5fac-8e6b-46d5-a8e1-3df911fa9c60', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 459000, tzinfo=datetime.timezone.utc), 'user_id': '066', 'order_items': [{'product_id': 110, 'quantity': 7}, {'product_id': 599, 'quantity': 4}, {'product_id': 142, 'quantity': 3}, {'product_id': 923, 'quantity': 7}]}, topic=orders, partition=0, offset=4886, ts=1684794652579
4INFO:root:key={"order_id": "260c0ccf-29d1-4cef-88a9-7fc00618616e"}, value={'order_id': '260c0ccf-29d1-4cef-88a9-7fc00618616e', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 459000, tzinfo=datetime.timezone.utc), 'user_id': '070', 'order_items': [{'product_id': 709, 'quantity': 6}, {'product_id': 523, 'quantity': 4}, {'product_id': 895, 'quantity': 4}, {'product_id': 944, 'quantity': 2}]}, topic=orders, partition=0, offset=4887, ts=1684794652583
5INFO:root:key={"order_id": "5ec8c9a1-5ad6-40f1-b0a5-09e7a060adb7"}, value={'order_id': '5ec8c9a1-5ad6-40f1-b0a5-09e7a060adb7', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 459000, tzinfo=datetime.timezone.utc), 'user_id': '027', 'order_items': [{'product_id': 401, 'quantity': 7}]}, topic=orders, partition=0, offset=4888, ts=1684794652589
6INFO:root:key={"order_id": "1d58572f-18d3-4181-9c35-5d179e1fc322"}, value={'order_id': '1d58572f-18d3-4181-9c35-5d179e1fc322', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 460000, tzinfo=datetime.timezone.utc), 'user_id': '076', 'order_items': [{'product_id': 30, 'quantity': 4}, {'product_id': 230, 'quantity': 3}, {'product_id': 351, 'quantity': 7}]}, topic=orders, partition=0, offset=4889, ts=1684794652609
7INFO:root:key={"order_id": "13fbd9c3-87e8-4d25-aec6-00c0a897e2f2"}, value={'order_id': '13fbd9c3-87e8-4d25-aec6-00c0a897e2f2', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '078', 'order_items': [{'product_id': 617, 'quantity': 6}]}, topic=orders, partition=0, offset=4890, ts=1684794652612
8INFO:root:key={"order_id": "00328584-7db3-4cfb-a5b7-de2c7eed7f43"}, value={'order_id': '00328584-7db3-4cfb-a5b7-de2c7eed7f43', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '010', 'order_items': [{'product_id': 213, 'quantity': 5}, {'product_id': 486, 'quantity': 3}]}, topic=orders, partition=0, offset=4891, ts=1684794652612
9INFO:root:key={"order_id": "2b45ef3c-4061-4e24-ace9-a897878eb5a4"}, value={'order_id': '2b45ef3c-4061-4e24-ace9-a897878eb5a4', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '061', 'order_items': [{'product_id': 240, 'quantity': 5}, {'product_id': 585, 'quantity': 5}, {'product_id': 356, 'quantity': 9}, {'product_id': 408, 'quantity': 2}]}, topic=orders, partition=0, offset=4892, ts=1684794652613
10INFO:root:key={"order_id": "293f6008-b3c5-41b0-a37b-df90c04f8e0c"}, value={'order_id': '293f6008-b3c5-41b0-a37b-df90c04f8e0c', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '066', 'order_items': [{'product_id': 96, 'quantity': 5}, {'product_id': 359, 'quantity': 2}, {'product_id': 682, 'quantity': 9}]}, topic=orders, partition=0, offset=4893, ts=1684794652614
11INFO:root:key={"order_id": "b09d03bf-9500-460c-a3cc-028aa4812b46"}, value={'order_id': 'b09d03bf-9500-460c-a3cc-028aa4812b46', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 463000, tzinfo=datetime.timezone.utc), 'user_id': '071', 'order_items': [{'product_id': 369, 'quantity': 6}, {'product_id': 602, 'quantity': 6}, {'product_id': 252, 'quantity': 10}, {'product_id': 910, 'quantity': 7}]}, topic=orders, partition=0, offset=4894, ts=1684794652629
12INFO:root:key={"order_id": "265c64a0-a520-494f-84d5-ebaf4496fe1c"}, value={'order_id': '265c64a0-a520-494f-84d5-ebaf4496fe1c', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 463000, tzinfo=datetime.timezone.utc), 'user_id': '009', 'order_items': [{'product_id': 663, 'quantity': 5}, {'product_id': 351, 'quantity': 4}, {'product_id': 373, 'quantity': 5}]}, topic=orders, partition=0, offset=4895, ts=1684794652630
We can also check the consumers with the management apps. For example, the 3 running consumers can be seen in the Consumers menu of kafka-ui. As expected, each consumer subscribes to its own topic partition. We can run the management apps by docker-compose -f compose-ui.yml up -d
.
Schemas
On AWS Console, we can check the schema of the value is created.
Also, we are able to see it on kpow. The community edition only supports a single schema registry and its name is marked as glue1.
Kafka Topics
The orders topic can be found in the Topics menu of kafka-ui.
We can browse individual messages in the Messages tab. Note that we should select the Glue serializer plugin name (online-order) on the Value Serde drop down list. Otherwise, records won’t be deserialized correctly.
We can check the topic messages on kpow as well. If we select AVRO on the Value Deserializer drop down list, it requires to select the associating schema registry. We can select the pre-set schema registry name of glue1. Upon hitting the Search button, messages show up after being deserialized properly.
Summary
In Part 4, we developed Kafka producer and consumer applications using the kafka-python package without integrating schema registry. Later we discussed the benefits of schema registry when developing Kafka applications in Part 5. In this post, I demonstrated how to enhance the existing applications by integrating AWS Glue Schema Registry.
Comments