Apache Kafka has five core APIs, and we can develop applications to send/read streams of data to/from topics in a Kafka cluster using the producer and consumer APIs. While the main Kafka project maintains only the Java APIs, there are several open source projects that provide the Kafka client APIs in Python. In this post, we discuss how to develop Kafka client applications using the kafka-python package on Kubernetes.

Kafka Client Apps

We create Kafka producer and consumer apps using the kafka-python package. The source can be found in the GitHub repository of this post.

Producer

The producer app sends fake order data that is generated by the Faker package. The Order class generates one or more order records by the create method where each record includes order ID, order timestamp, user ID and order items. Both the key and value are serialized as JSON.

 1# clients/producer.py
 2import os
 3import datetime
 4import time
 5import json
 6import typing
 7import logging
 8import dataclasses
 9
10from faker import Faker
11from kafka import KafkaProducer
12
13logging.basicConfig(level=logging.INFO)
14
15
16@dataclasses.dataclass
17class OrderItem:
18    product_id: int
19    quantity: int
20
21
22@dataclasses.dataclass
23class Order:
24    order_id: str
25    ordered_at: datetime.datetime
26    user_id: str
27    order_items: typing.List[OrderItem]
28
29    def asdict(self):
30        return dataclasses.asdict(self)
31
32    @classmethod
33    def auto(cls, fake: Faker = Faker()):
34        user_id = str(fake.random_int(1, 100)).zfill(3)
35        order_items = [
36            OrderItem(fake.random_int(1, 1000), fake.random_int(1, 10))
37            for _ in range(fake.random_int(1, 4))
38        ]
39        return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
40
41    def create(self, num: int):
42        return [self.auto() for _ in range(num)]
43
44
45class Producer:
46    def __init__(self, bootstrap_servers: list, topic: str):
47        self.bootstrap_servers = bootstrap_servers
48        self.topic = topic
49        self.producer = self.create()
50
51    def create(self):
52        return KafkaProducer(
53            bootstrap_servers=self.bootstrap_servers,
54            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
55            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
56            api_version=(2, 8, 1)
57        )
58
59    def send(self, orders: typing.List[Order]):
60        for order in orders:
61            try:
62                self.producer.send(
63                    self.topic, key={"order_id": order.order_id}, value=order.asdict()
64                )
65            except Exception as e:
66                raise RuntimeError("fails to send a message") from e
67        self.producer.flush()
68
69    def serialize(self, obj):
70        if isinstance(obj, datetime.datetime):
71            return obj.isoformat()
72        if isinstance(obj, datetime.date):
73            return str(obj)
74        return obj
75
76
77if __name__ == "__main__":
78    producer = Producer(
79        bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","),
80        topic=os.getenv("TOPIC_NAME", "orders"),
81    )
82    max_run = int(os.getenv("MAX_RUN", "-1"))
83    logging.info(f"max run - {max_run}")
84    current_run = 0
85    while True:
86        current_run += 1
87        logging.info(f"current run - {current_run}")
88        if current_run > max_run and max_run >= 0:
89            logging.info(f"exceeds max run, finish")
90            producer.producer.close()
91            break
92        producer.send(Order.auto().create(100))
93        time.sleep(1)

A sample order record is shown below.

 1{
 2	"order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16",
 3	"ordered_at": "2023-12-26T18:02:54.510497",
 4	"user_id": "050",
 5	"order_items": [
 6		{
 7			"product_id": 113,
 8			"quantity": 9
 9		},
10		{
11			"product_id": 58,
12			"quantity": 5
13		}
14	]
15}

Consumer

The Consumer class instantiates the KafkaConsumer in the create method. The main consumer configuration values are provided by the constructor arguments, and those are Kafka bootstrap server addresses (bootstrap_servers), topic names (topics) and consumer group ID (group_id). The process method of the class polls messages and logs details of the consumer records.

 1# clients/consumer.py
 2import os
 3import time
 4import logging
 5
 6from kafka import KafkaConsumer
 7from kafka.errors import KafkaError
 8
 9logging.basicConfig(level=logging.INFO)
10
11
12class Consumer:
13    def __init__(self, bootstrap_servers: list, topics: list, group_id: str) -> None:
14        self.bootstrap_servers = bootstrap_servers
15        self.topics = topics
16        self.group_id = group_id
17        self.consumer = self.create()
18
19    def create(self):
20        return KafkaConsumer(
21            *self.topics,
22            bootstrap_servers=self.bootstrap_servers,
23            auto_offset_reset="earliest",
24            enable_auto_commit=True,
25            group_id=self.group_id,
26            key_deserializer=lambda v: v.decode("utf-8"),
27            value_deserializer=lambda v: v.decode("utf-8"),
28            api_version=(2, 8, 1)
29        )
30
31    def process(self):
32        try:
33            while True:
34                msg = self.consumer.poll(timeout_ms=1000)
35                if msg is None:
36                    continue
37                self.print_info(msg)
38                time.sleep(1)
39        except KafkaError as error:
40            logging.error(error)
41
42    def print_info(self, msg: dict):
43        for t, v in msg.items():
44            for r in v:
45                logging.info(
46                    f"key={r.key}, value={r.value}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
47                )
48
49
50if __name__ == "__main__":
51    consumer = Consumer(
52        bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","),
53        topics=os.getenv("TOPIC_NAME", "orders").split(","),
54        group_id=os.getenv("GROUP_ID", "orders-group"),
55    )
56    consumer.process()

Test Client Apps on Host

We assume that a Kafka cluster and management app are deployed on Minikube as discussed in Part 1. As mentioned in Part 1, the external listener of the Kafka bootstrap server is exposed by a service named demo-cluster-kafka-external-bootstrap. We can use the minikube service command to obtain the Kubernetes URL for the service.

1minikube service demo-cluster-kafka-external-bootstrap --url
2
3# http://127.0.0.1:42289
4# ❗  Because you are using a Docker driver on linux, the terminal needs to be open to run it.

We can execute the Kafka client apps by replacing the bootstrap server address with the URL obtained in the previous step. Note that the apps should run in separate terminals.

1# terminal 1
2BOOTSTRAP_SERVERS=127.0.0.1:42289 python clients/producer.py
3# terminal 2
4BOOTSTRAP_SERVERS=127.0.0.1:42289 python clients/consumer.py

The access URL of the Kafka management app (kafka-ui) can be obtained using the minikube service command as shown below.

1minikube service kafka-ui --url
2
3# http://127.0.0.1:36477
4# ❗  Because you are using a Docker driver on linux, the terminal needs to be open to run it.

On the management app, we can check messages are created in the orders topic. Note that the Kafka cluster is configured to allow automatic creation of topics and the default number of partitions is set to 3.

Also, we can see that messages are consumed by a single consumer in the consumer group named orders-group.

Deploy Client Apps

Build Docker Image

We need a custom Docker image to deploy the client apps and normally images are pulled from an external Docker registry. Instead of relying on an external registry, however, we can reuse the Docker daemon inside the Minikube cluster, which speeds up local development. It can be achieved by executing the following command.

1# use docker daemon inside minikube cluster
2eval $(minikube docker-env)
3# Host added: /home/jaehyeon/.ssh/known_hosts ([127.0.0.1]:32772)

The Dockerfile copies the client apps into the /app folder and installs dependent pip packages.

1# clients/Dockerfile
2FROM bitnami/python:3.8
3
4COPY . /app
5RUN pip install -r requirements.txt

We can check the image is found in the Docker daemon inside the Minikube cluster after building it with a name of order-clients:0.1.0.

1docker build -t=order-clients:0.1.0 clients/.
2
3docker images order-clients
4# REPOSITORY      TAG       IMAGE ID       CREATED          SIZE
5# order-clients   0.1.0     bc48046837b1   26 seconds ago   589MB

Deploy on Minikube

We can create the Kafka client apps using Kubernetes Deployments. Both the apps have a single instance and Kafka cluster access details are added to environment variables. Note that, as we use the local Docker image (order-clients:0.1.0), the image pull policy (imagePullPolicy) is set to Never so that it will not be pulled from an external Docker registry.

 1# manifests/kafka-clients.yml
 2apiVersion: apps/v1
 3kind: Deployment
 4metadata:
 5  labels:
 6    app: order-producer
 7    group: client
 8  name: order-producer
 9spec:
10  replicas: 1
11  selector:
12    matchLabels:
13      app: order-producer
14  template:
15    metadata:
16      labels:
17        app: order-producer
18        group: client
19    spec:
20      containers:
21        - image: order-clients:0.1.0
22          name: producer-container
23          args: ["python", "producer.py"]
24          env:
25            - name: BOOTSTRAP_SERVERS
26              value: demo-cluster-kafka-bootstrap:9092
27            - name: TOPIC_NAME
28              value: orders
29            - name: TZ
30              value: Australia/Sydney
31          resources: {}
32          imagePullPolicy: Never # shouldn't be Always
33---
34apiVersion: apps/v1
35kind: Deployment
36metadata:
37  labels:
38    app: order-consumer
39    group: client
40  name: order-consumer
41spec:
42  replicas: 1
43  selector:
44    matchLabels:
45      app: order-consumer
46  template:
47    metadata:
48      labels:
49        app: order-consumer
50        group: client
51    spec:
52      containers:
53        - image: order-clients:0.1.0
54          name: consumer-container
55          args: ["python", "consumer.py"]
56          env:
57            - name: BOOTSTRAP_SERVERS
58              value: demo-cluster-kafka-bootstrap:9092
59            - name: TOPIC_NAME
60              value: orders
61            - name: GROUP_ID
62              value: orders-group
63            - name: TZ
64              value: Australia/Sydney
65          resources: {}
66          imagePullPolicy: Never

The client apps can be deployed using the kubectl create command, and we can check the producer and consumer apps run on a single pod respectively.

 1kubectl create -f manifests/kafka-clients.yml
 2
 3kubectl get all -l group=client
 4# NAME                                  READY   STATUS    RESTARTS   AGE
 5# pod/order-consumer-79785749d5-67bxz   1/1     Running   0          12s
 6# pod/order-producer-759d568fb8-rrl6w   1/1     Running   0          12s
 7
 8# NAME                             READY   UP-TO-DATE   AVAILABLE   AGE
 9# deployment.apps/order-consumer   1/1     1            1           12s
10# deployment.apps/order-producer   1/1     1            1           12s
11
12# NAME                                        DESIRED   CURRENT   READY   AGE
13# replicaset.apps/order-consumer-79785749d5   1         1         1       12s
14# replicaset.apps/order-producer-759d568fb8   1         1         1       12s

We can monitor the client apps using the log messages. Below shows the last 3 messages of them, and we see that they run as expected.

1kubectl logs deploy/order-producer --tail=3
2# INFO:root:current run - 104
3# INFO:root:current run - 105
4# INFO:root:current run - 106
5
6kubectl logs deploy/order-consumer --tail=3
7# INFO:root:key={"order_id": "d9b9e577-7a02-401e-b0e1-2d0cdcda51a3"}, value={"order_id": "d9b9e577-7a02-401e-b0e1-2d0cdcda51a3", "ordered_at": "2023-12-18T18:17:13.065654", "user_id": "061", "order_items": [{"product_id": 866, "quantity": 7}, {"product_id": 970, "quantity": 1}]}, topic=orders, partition=1, offset=7000, ts=1702923433077
8# INFO:root:key={"order_id": "dfbd2e1f-1b18-4772-83b9-c689a3da4c03"}, value={"order_id": "dfbd2e1f-1b18-4772-83b9-c689a3da4c03", "ordered_at": "2023-12-18T18:17:13.065754", "user_id": "016", "order_items": [{"product_id": 853, "quantity": 10}]}, topic=orders, partition=1, offset=7001, ts=1702923433077
9# INFO:root:key={"order_id": "eaf43f0b-53ed-419a-ba75-1d74bd0525a4"}, value={"order_id": "eaf43f0b-53ed-419a-ba75-1d74bd0525a4", "ordered_at": "2023-12-18T18:17:13.065795", "user_id": "072", "order_items": [{"product_id": 845, "quantity": 1}, {"product_id": 944, "quantity": 7}, {"product_id": 454, "quantity": 10}, {"product_id": 834, "quantity": 9}]}, topic=orders, partition=1, offset=7002, ts=1702923433078

Delete Resources

The Kubernetes resources and Minikube cluster can be removed by the kubectl delete and minikube delete commands respectively.

1## delete resources
2kubectl delete -f manifests/kafka-clients.yml
3kubectl delete -f manifests/kafka-cluster.yaml
4kubectl delete -f manifests/kafka-ui.yaml
5kubectl delete -f manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
6
7## delete minikube
8minikube delete

Summary

Apache Kafka has five core APIs, and we can develop applications to send/read streams of data to/from topics in a Kafka cluster using the producer and consumer APIs. In this post, we discussed how to develop Kafka client applications using the kafka-python package on Kubernetes.