In the previous post, we discussed Kafka Connect to stream data to/from a Kafka cluster. Kafka also includes the Producer/Consumer APIs that allow client applications to send/read streams of data to/from topics in a Kafka cluster. While the main Kafka project maintains only the Java clients, there are several open source projects that provide the Kafka client APIs in Python. In this post, I’ll demonstrate how to develop producer/consumer applications using the kafka-python package.

Producer

The same Kafka producer app that is introduced in Part 2 is used again. It sends fake order data that is generated by the Faker package. The Order class generates one or more fake order records by the create method, and a record includes order ID, order timestamp, user ID and order items. Both the key and value are serialized as JSON. Note, as the producer runs outside the Docker network, the host name of the external listener (localhost:29092) is used as the bootstrap server address. It can run simply by python producer.py and the source can be found in the GitHub repository of this post.

 1# kafka-pocs/kafka-dev-with-docker/part-04/producer.py
 2import os
 3import datetime
 4import time
 5import json
 6import typing
 7import logging
 8import dataclasses
 9
10from faker import Faker
11from kafka import KafkaProducer
12
13logging.basicConfig(level=logging.INFO)
14
15
16@dataclasses.dataclass
17class OrderItem:
18    product_id: int
19    quantity: int
20
21
22@dataclasses.dataclass
23class Order:
24    order_id: str
25    ordered_at: datetime.datetime
26    user_id: str
27    order_items: typing.List[OrderItem]
28
29    def asdict(self):
30        return dataclasses.asdict(self)
31
32    @classmethod
33    def auto(cls, fake: Faker = Faker()):
34        user_id = str(fake.random_int(1, 100)).zfill(3)
35        order_items = [
36            OrderItem(fake.random_int(1, 1000), fake.random_int(1, 10))
37            for _ in range(fake.random_int(1, 4))
38        ]
39        return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
40
41    def create(self, num: int):
42        return [self.auto() for _ in range(num)]
43
44
45class Producer:
46    def __init__(self, bootstrap_servers: list, topic: str):
47        self.bootstrap_servers = bootstrap_servers
48        self.topic = topic
49        self.producer = self.create()
50
51    def create(self):
52        return KafkaProducer(
53            bootstrap_servers=self.bootstrap_servers,
54            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
55            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
56        )
57
58    def send(self, orders: typing.List[Order]):
59        for order in orders:
60            try:
61                self.producer.send(
62                    self.topic, key={"order_id": order.order_id}, value=order.asdict()
63                )
64            except Exception as e:
65                raise RuntimeError("fails to send a message") from e
66        self.producer.flush()
67
68    def serialize(self, obj):
69        if isinstance(obj, datetime.datetime):
70            return obj.isoformat()
71        if isinstance(obj, datetime.date):
72            return str(obj)
73        return obj
74
75
76if __name__ == "__main__":
77    producer = Producer(
78        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
79        topic=os.getenv("TOPIC_NAME", "orders"),
80    )
81    max_run = int(os.getenv("MAX_RUN", "-1"))
82    logging.info(f"max run - {max_run}")
83    current_run = 0
84    while True:
85        current_run += 1
86        logging.info(f"current run - {current_run}")
87        if current_run > max_run and max_run >= 0:
88            logging.info(f"exceeds max run, finish")
89            producer.producer.close()
90            break
91        producer.send(Order.auto().create(100))
92        time.sleep(1)

A sample order record is shown below.

 1{
 2	"order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16",
 3	"ordered_at": "2023-05-13T18:02:54.510497",
 4	"user_id": "050",
 5	"order_items": [
 6		{
 7			"product_id": 113,
 8			"quantity": 9
 9		},
10		{
11			"product_id": 58,
12			"quantity": 5
13		}
14	]
15}

Consumer

The Consumer class instantiates the KafkaConsumer in the create method. The main consumer configuration values are provided by the constructor arguments: Kafka bootstrap server addresses (bootstrap_servers), topic names (topics) and consumer group ID (group_id). The process() method of the class polls messages and logs details of consumer records.

 1# kafka-pocs/kafka-dev-with-docker/part-04/consumer.py
 2import os
 3import time
 4import logging
 5
 6from kafka import KafkaConsumer
 7from kafka.errors import KafkaError
 8
 9logging.basicConfig(level=logging.INFO)
10
11
12class Consumer:
13    def __init__(self, bootstrap_servers: list, topics: list, group_id: str) -> None:
14        self.bootstrap_servers = bootstrap_servers
15        self.topics = topics
16        self.group_id = group_id
17        self.consumer = self.create()
18
19    def create(self):
20        return KafkaConsumer(
21            *self.topics,
22            bootstrap_servers=self.bootstrap_servers,
23            auto_offset_reset="earliest",
24            enable_auto_commit=True,
25            group_id=self.group_id,
26            key_deserializer=lambda v: v.decode("utf-8"),
27            value_deserializer=lambda v: v.decode("utf-8"),
28        )
29
30    def process(self):
31        try:
32            while True:
33                msg = self.consumer.poll(timeout_ms=1000)
34                if msg is None:
35                    continue
36                self.print_info(msg)
37                time.sleep(1)
38        except KafkaError as error:
39            logging.error(error)
40
41    def print_info(self, msg: dict):
42        for t, v in msg.items():
43            for r in v:
44                logging.info(
45                    f"key={r.key}, value={r.value}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
46                )
47
48
49if __name__ == "__main__":
50    consumer = Consumer(
51        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
52        topics=os.getenv("TOPIC_NAME", "orders").split(","),
53        group_id=os.getenv("GROUP_ID", "orders-group"),
54    )
55    consumer.process()

Consumer Services

As the default number of partitions is set to be 3 in the compose-kafka.yml, a docker compose file is created for the consumer in order to deploy 3 instances of the app using the scale option. As the service uses the same docker network (kafkanet), we can take the service name of the brokers (e.g. kafka-0) on port 9092. Once started, it installs required packages and starts the consumer.

 1# kafka-pocs/kafka-dev-with-docker/part-04/compose-consumer.yml
 2version: "3.5"
 3
 4services:
 5  app:
 6    image: bitnami/python:3.9
 7    command: "sh -c 'pip install -r requirements.txt && python consumer.py'"
 8    networks:
 9      - kafkanet
10    environment:
11      BOOTSTRAP_SERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
12      TOPIC_NAME: orders
13      GROUP_ID: orders-group
14      TZ: Australia/Sydney
15    volumes:
16      - .:/app
17
18networks:
19  kafkanet:
20    external: true
21    name: kafka-network

Start Applications

We can run the Kafka cluster by docker-compose -f compose-kafka.yml up -d and the producer by python producer.py. As mentioned earlier, we’ll deploy 3 instances of the consumer, and they can be deployed with the scale option as shown below. We can check the running consumer instances using the ps command of Docker Compose.

1$ docker-compose -f compose-consumer.yml up -d --scale app=3
2$ docker-compose -f compose-consumer.yml ps
3    Name                   Command               State    Ports  
4-----------------------------------------------------------------
5part-04_app_1   sh -c pip install -r requi ...   Up      8000/tcp
6part-04_app_2   sh -c pip install -r requi ...   Up      8000/tcp
7part-04_app_3   sh -c pip install -r requi ...   Up      8000/tcp

Each instance of the consumer subscribes to its own topic partition, and we can check that by container logs. Below shows the last 10 log entries of one of the instances. It shows that it polls messages from partition 0 only.

 1$ docker logs -f --tail 10 part-04_app_1
 2
 3INFO:root:key={"order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16"}, value={"order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16", "ordered_at": "2023-05-13T18:02:54.510497", "user_id": "050", "order_items": [{"product_id": 113, "quantity": 9}, {"product_id": 58, "quantity": 5}]}, topic=orders, partition=0, offset=11407, ts=1684000974514
 4INFO:root:key={"order_id": "d57427fc-5325-49eb-9fb7-e4fac1eca9b4"}, value={"order_id": "d57427fc-5325-49eb-9fb7-e4fac1eca9b4", "ordered_at": "2023-05-13T18:02:54.510548", "user_id": "078", "order_items": [{"product_id": 111, "quantity": 4}]}, topic=orders, partition=0, offset=11408, ts=1684000974514
 5INFO:root:key={"order_id": "66c4ca6f-30e2-4f94-a971-ec23c9952430"}, value={"order_id": "66c4ca6f-30e2-4f94-a971-ec23c9952430", "ordered_at": "2023-05-13T18:02:54.510565", "user_id": "004", "order_items": [{"product_id": 647, "quantity": 2}, {"product_id": 894, "quantity": 1}]}, topic=orders, partition=0, offset=11409, ts=1684000974514
 6INFO:root:key={"order_id": "518a6812-4357-4ec1-9e5c-aad7853646ee"}, value={"order_id": "518a6812-4357-4ec1-9e5c-aad7853646ee", "ordered_at": "2023-05-13T18:02:54.510609", "user_id": "043", "order_items": [{"product_id": 882, "quantity": 5}]}, topic=orders, partition=0, offset=11410, ts=1684000974514
 7INFO:root:key={"order_id": "b22922e8-8ad0-48c3-b970-a486d4576d5c"}, value={"order_id": "b22922e8-8ad0-48c3-b970-a486d4576d5c", "ordered_at": "2023-05-13T18:02:54.510625", "user_id": "002", "order_items": [{"product_id": 206, "quantity": 6}, {"product_id": 810, "quantity": 9}]}, topic=orders, partition=0, offset=11411, ts=1684000974514
 8INFO:root:key={"order_id": "1ef36da0-6a0b-4ec2-9ecd-10a020acfbfd"}, value={"order_id": "1ef36da0-6a0b-4ec2-9ecd-10a020acfbfd", "ordered_at": "2023-05-13T18:02:54.510660", "user_id": "085", "order_items": [{"product_id": 18, "quantity": 3}]}, topic=orders, partition=0, offset=11412, ts=1684000974515
 9INFO:root:key={"order_id": "e9efdfd8-dc55-47b9-9cb0-c2e87e864435"}, value={"order_id": "e9efdfd8-dc55-47b9-9cb0-c2e87e864435", "ordered_at": "2023-05-13T18:02:54.510692", "user_id": "051", "order_items": [{"product_id": 951, "quantity": 6}]}, topic=orders, partition=0, offset=11413, ts=1684000974515
10INFO:root:key={"order_id": "b24ed1c0-150a-41b3-b1cb-a27fb5581b2b"}, value={"order_id": "b24ed1c0-150a-41b3-b1cb-a27fb5581b2b", "ordered_at": "2023-05-13T18:02:54.510737", "user_id": "096", "order_items": [{"product_id": 734, "quantity": 3}]}, topic=orders, partition=0, offset=11414, ts=1684000974515
11INFO:root:key={"order_id": "74b06957-2c6c-4e46-be49-d2915cc80b74"}, value={"order_id": "74b06957-2c6c-4e46-be49-d2915cc80b74", "ordered_at": "2023-05-13T18:02:54.510774", "user_id": "072", "order_items": [{"product_id": 968, "quantity": 2}, {"product_id": 602, "quantity": 3}, {"product_id": 316, "quantity": 9}, {"product_id": 971, "quantity": 8}]}, topic=orders, partition=0, offset=11415, ts=1684000974515
12INFO:root:key={"order_id": "fce38c6b-4806-4579-b11e-8eac24b5166b"}, value={"order_id": "fce38c6b-4806-4579-b11e-8eac24b5166b", "ordered_at": "2023-05-13T18:02:54.510863", "user_id": "071", "order_items": [{"product_id": 751, "quantity": 8}]}, topic=orders, partition=0, offset=11416, ts=1684000974515

We can also check the consumers with management apps. For example, the 3 running consumers can be seen in the Consumers menu of kafka-ui. As expected, each consumer subscribes to its own topic partition. We can run the management apps by docker-compose -f compose-ui.yml up -d.

Summary

Kafka includes the Producer/Consumer APIs that allow client applications to send/read streams of data to/from topics in a Kafka cluster. While the main Kafka project maintains only the Java clients, there are several open source projects that provide the Kafka client APIs in Python. In this post, I demonstrated how to develop producer/consumer applications using the kafka-python package.