In Part 4, we developed Kafka producer and consumer applications using the kafka-python package. The Kafka messages are serialized as Json, but are not associated with a schema as there was not an integrated schema registry. Later we discussed how producers and consumers to Kafka topics can use schemas to ensure data consistency and compatibility as schemas evolve in Part 5. In this post, I’ll demonstrate how to enhance the existing applications by integrating AWS Glue Schema Registry.

Producer

Fake order data is generated using the Faker package and the dataclasses_avroschema package is used to automatically generate the Avro schema according to its attributes. A mixin class called InjectCompatMixin is injected into the Order class, which specifies a schema compatibility mode into the generated schema. The auto() class method is used to generate an order record by instantiating the class.

The aws-glue-schema-registry package is used serialize order records. It provides the KafkaSerializer class that validates, registers and serializes the relevant records. It supports Json and Avro schemas, and we can add it to the value_serializer argument of the KafkaProducer class. By default, the schemas are named as <topic>-key and <topic>-value and it can be changed by updating the schema_naming_strategy argument. Note that, when sending a message, the value should be a tuple of data and schema.

The producer application can be run simply by python producer.py. Note that, as the producer runs outside the Docker network, the host name of the external listener (localhost:29092) should be used as the bootstrap server address. The source can be found in the GitHub repository of this post.

  1# kafka-pocs/kafka-dev-with-docker/part-07/producer.py
  2import os
  3import datetime
  4import time
  5import json
  6import typing
  7import logging
  8import dataclasses
  9import enum
 10
 11from faker import Faker
 12from dataclasses_avroschema import AvroModel
 13import boto3
 14import botocore.exceptions
 15from kafka import KafkaProducer
 16from aws_schema_registry import SchemaRegistryClient
 17from aws_schema_registry.avro import AvroSchema
 18from aws_schema_registry.adapter.kafka import KafkaSerializer
 19
 20logging.basicConfig(level=logging.INFO)
 21
 22
 23class Compatibility(enum.Enum):
 24    NONE = "NONE"
 25    DISABLED = "DISABLED"
 26    BACKWARD = "BACKWARD"
 27    BACKWARD_ALL = "BACKWARD_ALL"
 28    FORWARD = "FORWARD"
 29    FORWARD_ALL = "FORWARD_ALL"
 30    FULL = "FULL"
 31    FULL_ALL = "FULL_ALL"
 32
 33
 34class InjectCompatMixin:
 35    @classmethod
 36    def updated_avro_schema_to_python(cls, compat: Compatibility = Compatibility.BACKWARD):
 37        schema = cls.avro_schema_to_python()
 38        schema["compatibility"] = compat.value
 39        return schema
 40
 41    @classmethod
 42    def updated_avro_schema(cls, compat: Compatibility = Compatibility.BACKWARD):
 43        schema = cls.updated_avro_schema_to_python(compat)
 44        return json.dumps(schema)
 45
 46
 47@dataclasses.dataclass
 48class OrderItem(AvroModel):
 49    product_id: int
 50    quantity: int
 51
 52
 53@dataclasses.dataclass
 54class Order(AvroModel, InjectCompatMixin):
 55    "Online fake order item"
 56    order_id: str
 57    ordered_at: datetime.datetime
 58    user_id: str
 59    order_items: typing.List[OrderItem]
 60
 61    class Meta:
 62        namespace = "Order V1"
 63
 64    def asdict(self):
 65        return dataclasses.asdict(self)
 66
 67    @classmethod
 68    def auto(cls, fake: Faker = Faker()):
 69        user_id = str(fake.random_int(1, 100)).zfill(3)
 70        order_items = [
 71            OrderItem(fake.random_int(1, 1000), fake.random_int(1, 10))
 72            for _ in range(fake.random_int(1, 4))
 73        ]
 74        return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
 75
 76    def create(self, num: int):
 77        return [self.auto() for _ in range(num)]
 78
 79
 80class Producer:
 81    def __init__(self, bootstrap_servers: list, topic: str, registry: str):
 82        self.bootstrap_servers = bootstrap_servers
 83        self.topic = topic
 84        self.registry = registry
 85        self.glue_client = boto3.client(
 86            "glue", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-2")
 87        )
 88        self.producer = self.create()
 89
 90    @property
 91    def serializer(self):
 92        client = SchemaRegistryClient(self.glue_client, registry_name=self.registry)
 93        return KafkaSerializer(client)
 94
 95    def create(self):
 96        return KafkaProducer(
 97            bootstrap_servers=self.bootstrap_servers,
 98            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
 99            value_serializer=self.serializer,
100        )
101
102    def send(self, orders: typing.List[Order], schema: AvroSchema):
103        if not self.check_registry():
104            print(f"registry not found, create {self.registry}")
105            self.create_registry()
106
107        for order in orders:
108            try:
109                self.producer.send(
110                    self.topic, key={"order_id": order.order_id}, value=(order.asdict(), schema)
111                )
112            except Exception as e:
113                raise RuntimeError("fails to send a message") from e
114        self.producer.flush()
115
116    def serialize(self, obj):
117        if isinstance(obj, datetime.datetime):
118            return obj.isoformat()
119        if isinstance(obj, datetime.date):
120            return str(obj)
121        return obj
122
123    def check_registry(self):
124        try:
125            self.glue_client.get_registry(RegistryId={"RegistryName": self.registry})
126            return True
127        except botocore.exceptions.ClientError as e:
128            if e.response["Error"]["Code"] == "EntityNotFoundException":
129                return False
130            else:
131                raise e
132
133    def create_registry(self):
134        try:
135            self.glue_client.create_registry(RegistryName=self.registry)
136            return True
137        except botocore.exceptions.ClientError as e:
138            if e.response["Error"]["Code"] == "AlreadyExistsException":
139                return True
140            else:
141                raise e
142
143
144if __name__ == "__main__":
145    producer = Producer(
146        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
147        topic=os.getenv("TOPIC_NAME", "orders"),
148        registry=os.getenv("REGISTRY_NAME", "online-order"),
149    )
150    max_run = int(os.getenv("MAX_RUN", "-1"))
151    logging.info(f"max run - {max_run}")
152    current_run = 0
153    while True:
154        current_run += 1
155        logging.info(f"current run - {current_run}")
156        if current_run > max_run and max_run >= 0:
157            logging.info(f"exceeds max run, finish")
158            producer.producer.close()
159            break
160        orders = Order.auto().create(1)
161        schema = AvroSchema(Order.updated_avro_schema(Compatibility.BACKWARD))
162        producer.send(Order.auto().create(100), schema)
163        time.sleep(1)

The generated schema of the Order class can be found below.

 1{
 2  "doc": "Online fake order item",
 3  "namespace": "Order V1",
 4  "name": "Order",
 5  "compatibility": "BACKWARD",
 6  "type": "record",
 7  "fields": [
 8    {
 9      "name": "order_id",
10      "type": "string"
11    },
12    {
13      "name": "ordered_at",
14      "type": {
15        "type": "long",
16        "logicalType": "timestamp-millis"
17      }
18    },
19    {
20      "name": "user_id",
21      "type": "string"
22    },
23    {
24      "name": "order_items",
25      "type": {
26        "type": "array",
27        "items": {
28          "type": "record",
29          "name": "OrderItem",
30          "fields": [
31            {
32              "name": "product_id",
33              "type": "long"
34            },
35            {
36              "name": "quantity",
37              "type": "long"
38            }
39          ]
40        },
41        "name": "order_item"
42      }
43    }
44  ]
45}

Below shows an example order record.

 1{
 2	"order_id": "00328584-7db3-4cfb-a5b7-de2c7eed7f43",
 3	"ordered_at": "2023-05-23T08:30:52.461000",
 4	"user_id": "010",
 5	"order_items": [
 6		{
 7			"product_id": 213,
 8			"quantity": 5
 9		},
10		{
11			"product_id": 486,
12			"quantity": 3
13		}
14	]
15}

Consumer

The Consumer class instantiates the KafkaConsumer class in the create method. The main consumer configuration values are provided by the constructor arguments: Kafka bootstrap server addresses (bootstrap_servers), topic names (topics) and consumer group ID (group_id). Note that the aws-glue-schema-registry package provides the KafkaDeserializer class that deserializes messages according to the corresponding schema version, and we should use it as the value_deserializer. The process() method of the class polls messages and logs details of consumer records.

 1# kafka-pocs/kafka-dev-with-docker/part-07/consumer.py
 2import os
 3import time
 4import logging
 5
 6from kafka import KafkaConsumer
 7from kafka.errors import KafkaError
 8import boto3
 9from aws_schema_registry import SchemaRegistryClient, DataAndSchema
10from aws_schema_registry.adapter.kafka import KafkaDeserializer
11
12logging.basicConfig(level=logging.INFO)
13
14
15class Consumer:
16    def __init__(self, bootstrap_servers: list, topics: list, group_id: str, registry: str) -> None:
17        self.bootstrap_servers = bootstrap_servers
18        self.topics = topics
19        self.group_id = group_id
20        self.registry = registry
21        self.glue_client = boto3.client(
22            "glue", region_name=os.getenv("AWS_DEFAULT_REGION", "ap-southeast-2")
23        )
24        self.consumer = self.create()
25
26    @property
27    def deserializer(self):
28        client = SchemaRegistryClient(self.glue_client, registry_name=self.registry)
29        return KafkaDeserializer(client)
30
31    def create(self):
32        return KafkaConsumer(
33            *self.topics,
34            bootstrap_servers=self.bootstrap_servers,
35            auto_offset_reset="earliest",
36            enable_auto_commit=True,
37            group_id=self.group_id,
38            key_deserializer=lambda v: v.decode("utf-8"),
39            value_deserializer=self.deserializer,
40        )
41
42    def process(self):
43        try:
44            while True:
45                msg = self.consumer.poll(timeout_ms=1000)
46                if msg is None:
47                    continue
48                self.print_info(msg)
49                time.sleep(1)
50        except KafkaError as error:
51            logging.error(error)
52
53    def print_info(self, msg: dict):
54        for t, v in msg.items():
55            for r in v:
56                value: DataAndSchema = r.value
57                logging.info(
58                    f"key={r.key}, value={value.data}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
59                )
60
61
62if __name__ == "__main__":
63    consumer = Consumer(
64        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
65        topics=os.getenv("TOPIC_NAME", "orders").split(","),
66        group_id=os.getenv("GROUP_ID", "orders-group"),
67        registry=os.getenv("REGISTRY_NAME", "online-order"),
68    )
69    consumer.process()

Consumer Services

The default number of partitions is set to be 3 in the compose-kafka.yml. A docker compose file is created for the consumer in order to deploy multiple instances of the app using the scale option. As the service uses the same docker network (kafkanet), we can take the service names of the brokers (e.g. kafka-0) on port 9092. Once started, it installs required packages and starts the consumer.

 1# kafka-pocs/kafka-dev-with-docker/part-07/compose-consumer.yml
 2version: "3.5"
 3
 4services:
 5  app:
 6    image: bitnami/python:3.9
 7    command: "sh -c 'pip install -r requirements.txt && python consumer.py'"
 8    networks:
 9      - kafkanet
10    environment:
11      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
12      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
13      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
14      BOOTSTRAP_SERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
15      TOPIC_NAME: orders
16      GROUP_ID: orders-group
17      REGISTRY_NAME: online-order
18      TZ: Australia/Sydney
19    volumes:
20      - ./consumer.py:/app/consumer.py
21      - ./requirements.txt:/app/requirements.txt
22
23networks:
24  kafkanet:
25    external: true
26    name: kafka-network

Kafka Management App

We should configure additional details in environment variables in order to integrate Glue Schema Registry. While both apps provide serializers/deserializers, kpow supports to manage schemas to some extent as well.

For kafka-ui, we can add one or more serialization plugins. I added the Glue registry serializer as a plugin and named it online-order. It requires the plugin binary file path, class name, registry name and AWS region name. Another key configuration values are the key and value schema templates values, which are used for finding schema names. Only the value schema template is updated as it is different from the default value. Note that the template values are applicable for producing messages on the UI. Therefore, we can leave them commented out if we don’t want to produce messages on it. Finally, the Glue registry serializer binary should be downloaded as it is volume-mapped in the compose file. It can be downloaded from the project repository - see download.sh.

The configuration of kpow is simpler as it only requires the registry ARN and AWS region. Note that the app fails to start if the registry doesn’t exit. I created the registry named online-order before starting it.

 1# /kafka-dev-with-docker/part-07/compose-ui.yml
 2version: "3.5"
 3
 4services:
 5  kafka-ui:
 6    image: provectuslabs/kafka-ui:master
 7    container_name: kafka-ui
 8    ports:
 9      - "8080:8080"
10    networks:
11      - kafkanet
12    environment:
13      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
14      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
15      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
16      # kafka cluster
17      KAFKA_CLUSTERS_0_NAME: local
18      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
19      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
20      # glue schema registry serde
21      KAFKA_CLUSTERS_0_SERDE_0_NAME: online-order
22      KAFKA_CLUSTERS_0_SERDE_0_FILEPATH: /glue-serde/kafkaui-glue-serde-v1.0.3-jar-with-dependencies.jar
23      KAFKA_CLUSTERS_0_SERDE_0_CLASSNAME: com.provectus.kafka.ui.serdes.glue.GlueSerde
24      KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_REGION: $AWS_DEFAULT_REGION #required
25      KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_REGISTRY: online-order #required, name of Glue Schema Registry
26      # template that will be used to find schema name for topic key. Optional, default is null (not set).
27      # KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_KEYSCHEMANAMETEMPLATE: "%s-key"
28      # template that will be used to find schema name for topic value. Optional, default is '%s'
29      KAFKA_CLUSTERS_0_SERDE_0_PROPERTIES_VALUESCHEMANAMETEMPLATE: "%s-value"
30    volumes:
31      - ./kafkaui-glue-serde-v1.0.3-jar-with-dependencies.jar:/glue-serde/kafkaui-glue-serde-v1.0.3-jar-with-dependencies.jar
32  kpow:
33    image: factorhouse/kpow-ce:91.2.1
34    container_name: kpow
35    ports:
36      - "3000:3000"
37    networks:
38      - kafkanet
39    environment:
40      AWS_ACCESS_KEY_ID: $AWS_ACCESS_KEY_ID
41      AWS_SECRET_ACCESS_KEY: $AWS_SECRET_ACCESS_KEY
42      AWS_SESSION_TOKEN: $AWS_SESSION_TOKEN
43      # kafka cluster
44      BOOTSTRAP: kafka-0:9092,kafka-1:9092,kafka-2:9092
45      # glue schema registry
46      SCHEMA_REGISTRY_ARN: $SCHEMA_REGISTRY_ARN
47      SCHEMA_REGISTRY_REGION: $AWS_DEFAULT_REGION
48
49networks:
50  kafkanet:
51    external: true
52    name: kafka-network

Start Applications

We can run the Kafka cluster by docker-compose -f compose-kafka.yml up -d and the producer by python producer.py. As mentioned earlier, we’ll deploy 3 instances of the consumer, and they can be deployed with the scale option as shown below. We can check the running consumer instances using the ps command of Docker Compose.

1$ docker-compose -f compose-consumer.yml up -d --scale app=3
2$ docker-compose -f compose-consumer.yml ps
3    Name                   Command               State    Ports  
4-----------------------------------------------------------------
5part-07_app_1   sh -c pip install -r requi ...   Up      8000/tcp
6part-07_app_2   sh -c pip install -r requi ...   Up      8000/tcp
7part-07_app_3   sh -c pip install -r requi ...   Up      8000/tcp

Each instance of the consumer subscribes to its own topic partition, and we can check that in container logs. Below shows the last 10 log entries of one of the instances. It shows it polls messages from partition 0 only.

 1$ docker logs -f --tail 10 part-07_app_1
 2
 3INFO:root:key={"order_id": "91db5fac-8e6b-46d5-a8e1-3df911fa9c60"}, value={'order_id': '91db5fac-8e6b-46d5-a8e1-3df911fa9c60', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 459000, tzinfo=datetime.timezone.utc), 'user_id': '066', 'order_items': [{'product_id': 110, 'quantity': 7}, {'product_id': 599, 'quantity': 4}, {'product_id': 142, 'quantity': 3}, {'product_id': 923, 'quantity': 7}]}, topic=orders, partition=0, offset=4886, ts=1684794652579
 4INFO:root:key={"order_id": "260c0ccf-29d1-4cef-88a9-7fc00618616e"}, value={'order_id': '260c0ccf-29d1-4cef-88a9-7fc00618616e', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 459000, tzinfo=datetime.timezone.utc), 'user_id': '070', 'order_items': [{'product_id': 709, 'quantity': 6}, {'product_id': 523, 'quantity': 4}, {'product_id': 895, 'quantity': 4}, {'product_id': 944, 'quantity': 2}]}, topic=orders, partition=0, offset=4887, ts=1684794652583
 5INFO:root:key={"order_id": "5ec8c9a1-5ad6-40f1-b0a5-09e7a060adb7"}, value={'order_id': '5ec8c9a1-5ad6-40f1-b0a5-09e7a060adb7', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 459000, tzinfo=datetime.timezone.utc), 'user_id': '027', 'order_items': [{'product_id': 401, 'quantity': 7}]}, topic=orders, partition=0, offset=4888, ts=1684794652589
 6INFO:root:key={"order_id": "1d58572f-18d3-4181-9c35-5d179e1fc322"}, value={'order_id': '1d58572f-18d3-4181-9c35-5d179e1fc322', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 460000, tzinfo=datetime.timezone.utc), 'user_id': '076', 'order_items': [{'product_id': 30, 'quantity': 4}, {'product_id': 230, 'quantity': 3}, {'product_id': 351, 'quantity': 7}]}, topic=orders, partition=0, offset=4889, ts=1684794652609
 7INFO:root:key={"order_id": "13fbd9c3-87e8-4d25-aec6-00c0a897e2f2"}, value={'order_id': '13fbd9c3-87e8-4d25-aec6-00c0a897e2f2', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '078', 'order_items': [{'product_id': 617, 'quantity': 6}]}, topic=orders, partition=0, offset=4890, ts=1684794652612
 8INFO:root:key={"order_id": "00328584-7db3-4cfb-a5b7-de2c7eed7f43"}, value={'order_id': '00328584-7db3-4cfb-a5b7-de2c7eed7f43', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '010', 'order_items': [{'product_id': 213, 'quantity': 5}, {'product_id': 486, 'quantity': 3}]}, topic=orders, partition=0, offset=4891, ts=1684794652612
 9INFO:root:key={"order_id": "2b45ef3c-4061-4e24-ace9-a897878eb5a4"}, value={'order_id': '2b45ef3c-4061-4e24-ace9-a897878eb5a4', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '061', 'order_items': [{'product_id': 240, 'quantity': 5}, {'product_id': 585, 'quantity': 5}, {'product_id': 356, 'quantity': 9}, {'product_id': 408, 'quantity': 2}]}, topic=orders, partition=0, offset=4892, ts=1684794652613
10INFO:root:key={"order_id": "293f6008-b3c5-41b0-a37b-df90c04f8e0c"}, value={'order_id': '293f6008-b3c5-41b0-a37b-df90c04f8e0c', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 461000, tzinfo=datetime.timezone.utc), 'user_id': '066', 'order_items': [{'product_id': 96, 'quantity': 5}, {'product_id': 359, 'quantity': 2}, {'product_id': 682, 'quantity': 9}]}, topic=orders, partition=0, offset=4893, ts=1684794652614
11INFO:root:key={"order_id": "b09d03bf-9500-460c-a3cc-028aa4812b46"}, value={'order_id': 'b09d03bf-9500-460c-a3cc-028aa4812b46', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 463000, tzinfo=datetime.timezone.utc), 'user_id': '071', 'order_items': [{'product_id': 369, 'quantity': 6}, {'product_id': 602, 'quantity': 6}, {'product_id': 252, 'quantity': 10}, {'product_id': 910, 'quantity': 7}]}, topic=orders, partition=0, offset=4894, ts=1684794652629
12INFO:root:key={"order_id": "265c64a0-a520-494f-84d5-ebaf4496fe1c"}, value={'order_id': '265c64a0-a520-494f-84d5-ebaf4496fe1c', 'ordered_at': datetime.datetime(2023, 5, 22, 12, 30, 52, 463000, tzinfo=datetime.timezone.utc), 'user_id': '009', 'order_items': [{'product_id': 663, 'quantity': 5}, {'product_id': 351, 'quantity': 4}, {'product_id': 373, 'quantity': 5}]}, topic=orders, partition=0, offset=4895, ts=1684794652630

We can also check the consumers with the management apps. For example, the 3 running consumers can be seen in the Consumers menu of kafka-ui. As expected, each consumer subscribes to its own topic partition. We can run the management apps by docker-compose -f compose-ui.yml up -d.

Schemas

On AWS Console, we can check the schema of the value is created.

Also, we are able to see it on kpow. The community edition only supports a single schema registry and its name is marked as glue1.

Kafka Topics

The orders topic can be found in the Topics menu of kafka-ui.

We can browse individual messages in the Messages tab. Note that we should select the Glue serializer plugin name (online-order) on the Value Serde drop down list. Otherwise, records won’t be deserialized correctly.

We can check the topic messages on kpow as well. If we select AVRO on the Value Deserializer drop down list, it requires to select the associating schema registry. We can select the pre-set schema registry name of glue1. Upon hitting the Search button, messages show up after being deserialized properly.

Summary

In Part 4, we developed Kafka producer and consumer applications using the kafka-python package without integrating schema registry. Later we discussed the benefits of schema registry when developing Kafka applications in Part 5. In this post, I demonstrated how to enhance the existing applications by integrating AWS Glue Schema Registry.