Apache Kafka has five core APIs, and we can develop applications to send/read streams of data to/from topics in a Kafka cluster using the producer and consumer APIs. While the main Kafka project maintains only the Java APIs, there are several open source projects that provide the Kafka client APIs in Python. In this post, we discuss how to develop Kafka client applications using the kafka-python package on Kubernetes.
Kafka Client Apps
We create Kafka producer and consumer apps using the kafka-python package. The source can be found in the GitHub repository of this post.
Producer
The producer app sends fake order data that is generated by the Faker package. The Order class generates one or more order records by the create method where each record includes order ID, order timestamp, user ID and order items. Both the key and value are serialized as JSON.
1# clients/producer.py
2import os
3import datetime
4import time
5import json
6import typing
7import logging
8import dataclasses
9
10from faker import Faker
11from kafka import KafkaProducer
12
13logging.basicConfig(level=logging.INFO)
14
15
16@dataclasses.dataclass
17class OrderItem:
18 product_id: int
19 quantity: int
20
21
22@dataclasses.dataclass
23class Order:
24 order_id: str
25 ordered_at: datetime.datetime
26 user_id: str
27 order_items: typing.List[OrderItem]
28
29 def asdict(self):
30 return dataclasses.asdict(self)
31
32 @classmethod
33 def auto(cls, fake: Faker = Faker()):
34 user_id = str(fake.random_int(1, 100)).zfill(3)
35 order_items = [
36 OrderItem(fake.random_int(1, 1000), fake.random_int(1, 10))
37 for _ in range(fake.random_int(1, 4))
38 ]
39 return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
40
41 def create(self, num: int):
42 return [self.auto() for _ in range(num)]
43
44
45class Producer:
46 def __init__(self, bootstrap_servers: list, topic: str):
47 self.bootstrap_servers = bootstrap_servers
48 self.topic = topic
49 self.producer = self.create()
50
51 def create(self):
52 return KafkaProducer(
53 bootstrap_servers=self.bootstrap_servers,
54 value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
55 key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
56 api_version=(2, 8, 1)
57 )
58
59 def send(self, orders: typing.List[Order]):
60 for order in orders:
61 try:
62 self.producer.send(
63 self.topic, key={"order_id": order.order_id}, value=order.asdict()
64 )
65 except Exception as e:
66 raise RuntimeError("fails to send a message") from e
67 self.producer.flush()
68
69 def serialize(self, obj):
70 if isinstance(obj, datetime.datetime):
71 return obj.isoformat()
72 if isinstance(obj, datetime.date):
73 return str(obj)
74 return obj
75
76
77if __name__ == "__main__":
78 producer = Producer(
79 bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","),
80 topic=os.getenv("TOPIC_NAME", "orders"),
81 )
82 max_run = int(os.getenv("MAX_RUN", "-1"))
83 logging.info(f"max run - {max_run}")
84 current_run = 0
85 while True:
86 current_run += 1
87 logging.info(f"current run - {current_run}")
88 if current_run > max_run and max_run >= 0:
89 logging.info(f"exceeds max run, finish")
90 producer.producer.close()
91 break
92 producer.send(Order.auto().create(100))
93 time.sleep(1)
A sample order record is shown below.
1{
2 "order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16",
3 "ordered_at": "2023-12-26T18:02:54.510497",
4 "user_id": "050",
5 "order_items": [
6 {
7 "product_id": 113,
8 "quantity": 9
9 },
10 {
11 "product_id": 58,
12 "quantity": 5
13 }
14 ]
15}
Consumer
The Consumer class instantiates the KafkaConsumer in the create method. The main consumer configuration values are provided by the constructor arguments, and those are Kafka bootstrap server addresses (bootstrap_servers), topic names (topics) and consumer group ID (group_id). The process method of the class polls messages and logs details of the consumer records.
1# clients/consumer.py
2import os
3import time
4import logging
5
6from kafka import KafkaConsumer
7from kafka.errors import KafkaError
8
9logging.basicConfig(level=logging.INFO)
10
11
12class Consumer:
13 def __init__(self, bootstrap_servers: list, topics: list, group_id: str) -> None:
14 self.bootstrap_servers = bootstrap_servers
15 self.topics = topics
16 self.group_id = group_id
17 self.consumer = self.create()
18
19 def create(self):
20 return KafkaConsumer(
21 *self.topics,
22 bootstrap_servers=self.bootstrap_servers,
23 auto_offset_reset="earliest",
24 enable_auto_commit=True,
25 group_id=self.group_id,
26 key_deserializer=lambda v: v.decode("utf-8"),
27 value_deserializer=lambda v: v.decode("utf-8"),
28 api_version=(2, 8, 1)
29 )
30
31 def process(self):
32 try:
33 while True:
34 msg = self.consumer.poll(timeout_ms=1000)
35 if msg is None:
36 continue
37 self.print_info(msg)
38 time.sleep(1)
39 except KafkaError as error:
40 logging.error(error)
41
42 def print_info(self, msg: dict):
43 for t, v in msg.items():
44 for r in v:
45 logging.info(
46 f"key={r.key}, value={r.value}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
47 )
48
49
50if __name__ == "__main__":
51 consumer = Consumer(
52 bootstrap_servers=os.environ["BOOTSTRAP_SERVERS"].split(","),
53 topics=os.getenv("TOPIC_NAME", "orders").split(","),
54 group_id=os.getenv("GROUP_ID", "orders-group"),
55 )
56 consumer.process()
Test Client Apps on Host
We assume that a Kafka cluster and management app are deployed on Minikube as discussed in Part 1. As mentioned in Part 1, the external listener of the Kafka bootstrap server is exposed by a service named demo-cluster-kafka-external-bootstrap. We can use the minikube service command to obtain the Kubernetes URL for the service.
1minikube service demo-cluster-kafka-external-bootstrap --url
2
3# http://127.0.0.1:42289
4# ❗ Because you are using a Docker driver on linux, the terminal needs to be open to run it.
We can execute the Kafka client apps by replacing the bootstrap server address with the URL obtained in the previous step. Note that the apps should run in separate terminals.
1# terminal 1
2BOOTSTRAP_SERVERS=127.0.0.1:42289 python clients/producer.py
3# terminal 2
4BOOTSTRAP_SERVERS=127.0.0.1:42289 python clients/consumer.py
The access URL of the Kafka management app (kafka-ui) can be obtained using the minikube service command as shown below.
1minikube service kafka-ui --url
2
3# http://127.0.0.1:36477
4# ❗ Because you are using a Docker driver on linux, the terminal needs to be open to run it.
On the management app, we can check messages are created in the orders topic. Note that the Kafka cluster is configured to allow automatic creation of topics and the default number of partitions is set to 3.
Also, we can see that messages are consumed by a single consumer in the consumer group named orders-group.
Deploy Client Apps
Build Docker Image
We need a custom Docker image to deploy the client apps and normally images are pulled from an external Docker registry. Instead of relying on an external registry, however, we can reuse the Docker daemon inside the Minikube cluster, which speeds up local development. It can be achieved by executing the following command.
1# use docker daemon inside minikube cluster
2eval $(minikube docker-env)
3# Host added: /home/jaehyeon/.ssh/known_hosts ([127.0.0.1]:32772)
The Dockerfile copies the client apps into the /app folder and installs dependent pip packages.
1# clients/Dockerfile
2FROM bitnami/python:3.8
3
4COPY . /app
5RUN pip install -r requirements.txt
We can check the image is found in the Docker daemon inside the Minikube cluster after building it with a name of order-clients:0.1.0.
1docker build -t=order-clients:0.1.0 clients/.
2
3docker images order-clients
4# REPOSITORY TAG IMAGE ID CREATED SIZE
5# order-clients 0.1.0 bc48046837b1 26 seconds ago 589MB
Deploy on Minikube
We can create the Kafka client apps using Kubernetes Deployments. Both the apps have a single instance and Kafka cluster access details are added to environment variables. Note that, as we use the local Docker image (order-clients:0.1.0), the image pull policy (imagePullPolicy) is set to Never so that it will not be pulled from an external Docker registry.
1# manifests/kafka-clients.yml
2apiVersion: apps/v1
3kind: Deployment
4metadata:
5 labels:
6 app: order-producer
7 group: client
8 name: order-producer
9spec:
10 replicas: 1
11 selector:
12 matchLabels:
13 app: order-producer
14 template:
15 metadata:
16 labels:
17 app: order-producer
18 group: client
19 spec:
20 containers:
21 - image: order-clients:0.1.0
22 name: producer-container
23 args: ["python", "producer.py"]
24 env:
25 - name: BOOTSTRAP_SERVERS
26 value: demo-cluster-kafka-bootstrap:9092
27 - name: TOPIC_NAME
28 value: orders
29 - name: TZ
30 value: Australia/Sydney
31 resources: {}
32 imagePullPolicy: Never # shouldn't be Always
33---
34apiVersion: apps/v1
35kind: Deployment
36metadata:
37 labels:
38 app: order-consumer
39 group: client
40 name: order-consumer
41spec:
42 replicas: 1
43 selector:
44 matchLabels:
45 app: order-consumer
46 template:
47 metadata:
48 labels:
49 app: order-consumer
50 group: client
51 spec:
52 containers:
53 - image: order-clients:0.1.0
54 name: consumer-container
55 args: ["python", "consumer.py"]
56 env:
57 - name: BOOTSTRAP_SERVERS
58 value: demo-cluster-kafka-bootstrap:9092
59 - name: TOPIC_NAME
60 value: orders
61 - name: GROUP_ID
62 value: orders-group
63 - name: TZ
64 value: Australia/Sydney
65 resources: {}
66 imagePullPolicy: Never
The client apps can be deployed using the kubectl create command, and we can check the producer and consumer apps run on a single pod respectively.
1kubectl create -f manifests/kafka-clients.yml
2
3kubectl get all -l group=client
4# NAME READY STATUS RESTARTS AGE
5# pod/order-consumer-79785749d5-67bxz 1/1 Running 0 12s
6# pod/order-producer-759d568fb8-rrl6w 1/1 Running 0 12s
7
8# NAME READY UP-TO-DATE AVAILABLE AGE
9# deployment.apps/order-consumer 1/1 1 1 12s
10# deployment.apps/order-producer 1/1 1 1 12s
11
12# NAME DESIRED CURRENT READY AGE
13# replicaset.apps/order-consumer-79785749d5 1 1 1 12s
14# replicaset.apps/order-producer-759d568fb8 1 1 1 12s
We can monitor the client apps using the log messages. Below shows the last 3 messages of them, and we see that they run as expected.
1kubectl logs deploy/order-producer --tail=3
2# INFO:root:current run - 104
3# INFO:root:current run - 105
4# INFO:root:current run - 106
5
6kubectl logs deploy/order-consumer --tail=3
7# INFO:root:key={"order_id": "d9b9e577-7a02-401e-b0e1-2d0cdcda51a3"}, value={"order_id": "d9b9e577-7a02-401e-b0e1-2d0cdcda51a3", "ordered_at": "2023-12-18T18:17:13.065654", "user_id": "061", "order_items": [{"product_id": 866, "quantity": 7}, {"product_id": 970, "quantity": 1}]}, topic=orders, partition=1, offset=7000, ts=1702923433077
8# INFO:root:key={"order_id": "dfbd2e1f-1b18-4772-83b9-c689a3da4c03"}, value={"order_id": "dfbd2e1f-1b18-4772-83b9-c689a3da4c03", "ordered_at": "2023-12-18T18:17:13.065754", "user_id": "016", "order_items": [{"product_id": 853, "quantity": 10}]}, topic=orders, partition=1, offset=7001, ts=1702923433077
9# INFO:root:key={"order_id": "eaf43f0b-53ed-419a-ba75-1d74bd0525a4"}, value={"order_id": "eaf43f0b-53ed-419a-ba75-1d74bd0525a4", "ordered_at": "2023-12-18T18:17:13.065795", "user_id": "072", "order_items": [{"product_id": 845, "quantity": 1}, {"product_id": 944, "quantity": 7}, {"product_id": 454, "quantity": 10}, {"product_id": 834, "quantity": 9}]}, topic=orders, partition=1, offset=7002, ts=1702923433078
Delete Resources
The Kubernetes resources and Minikube cluster can be removed by the kubectl delete and minikube delete commands respectively.
1## delete resources
2kubectl delete -f manifests/kafka-clients.yml
3kubectl delete -f manifests/kafka-cluster.yaml
4kubectl delete -f manifests/kafka-ui.yaml
5kubectl delete -f manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
6
7## delete minikube
8minikube delete
Summary
Apache Kafka has five core APIs, and we can develop applications to send/read streams of data to/from topics in a Kafka cluster using the producer and consumer APIs. In this post, we discussed how to develop Kafka client applications using the kafka-python package on Kubernetes.
Comments