In the previous post, we discussed Kafka Connect to stream data to/from a Kafka cluster. Kafka also includes the Producer/Consumer APIs that allow client applications to send/read streams of data to/from topics in a Kafka cluster. While the main Kafka project maintains only the Java clients, there are several open source projects that provide the Kafka client APIs in Python. In this post, I’ll demonstrate how to develop producer/consumer applications using the kafka-python package.
- Part 1 Cluster Setup
- Part 2 Management App
- Part 3 Kafka Connect
- Part 4 Producer and Consumer (this post)
- Part 5 Glue Schema Registry
- Part 6 Kafka Connect with Glue Schema Registry
- Part 7 Producer and Consumer with Glue Schema Registry
- Part 8 SSL Encryption
- Part 9 SSL Authentication
- Part 10 SASL Authentication
- Part 11 Kafka Authorization
Producer
The same Kafka producer app that is introduced in Part 2 is used again. It sends fake order data that is generated by the Faker package. The Order class generates one or more fake order records by the create method, and a record includes order ID, order timestamp, user ID and order items. Both the key and value are serialized as JSON. Note, as the producer runs outside the Docker network, the host name of the external listener (localhost:29092
) is used as the bootstrap server address. It can run simply by python producer.py
and the source can be found in the GitHub repository of this post.
1# kafka-pocs/kafka-dev-with-docker/part-04/producer.py
2import os
3import datetime
4import time
5import json
6import typing
7import logging
8import dataclasses
9
10from faker import Faker
11from kafka import KafkaProducer
12
13logging.basicConfig(level=logging.INFO)
14
15
16@dataclasses.dataclass
17class OrderItem:
18 product_id: int
19 quantity: int
20
21
22@dataclasses.dataclass
23class Order:
24 order_id: str
25 ordered_at: datetime.datetime
26 user_id: str
27 order_items: typing.List[OrderItem]
28
29 def asdict(self):
30 return dataclasses.asdict(self)
31
32 @classmethod
33 def auto(cls, fake: Faker = Faker()):
34 user_id = str(fake.random_int(1, 100)).zfill(3)
35 order_items = [
36 OrderItem(fake.random_int(1, 1000), fake.random_int(1, 10))
37 for _ in range(fake.random_int(1, 4))
38 ]
39 return cls(fake.uuid4(), datetime.datetime.utcnow(), user_id, order_items)
40
41 def create(self, num: int):
42 return [self.auto() for _ in range(num)]
43
44
45class Producer:
46 def __init__(self, bootstrap_servers: list, topic: str):
47 self.bootstrap_servers = bootstrap_servers
48 self.topic = topic
49 self.producer = self.create()
50
51 def create(self):
52 return KafkaProducer(
53 bootstrap_servers=self.bootstrap_servers,
54 value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
55 key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
56 )
57
58 def send(self, orders: typing.List[Order]):
59 for order in orders:
60 try:
61 self.producer.send(
62 self.topic, key={"order_id": order.order_id}, value=order.asdict()
63 )
64 except Exception as e:
65 raise RuntimeError("fails to send a message") from e
66 self.producer.flush()
67
68 def serialize(self, obj):
69 if isinstance(obj, datetime.datetime):
70 return obj.isoformat()
71 if isinstance(obj, datetime.date):
72 return str(obj)
73 return obj
74
75
76if __name__ == "__main__":
77 producer = Producer(
78 bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
79 topic=os.getenv("TOPIC_NAME", "orders"),
80 )
81 max_run = int(os.getenv("MAX_RUN", "-1"))
82 logging.info(f"max run - {max_run}")
83 current_run = 0
84 while True:
85 current_run += 1
86 logging.info(f"current run - {current_run}")
87 if current_run > max_run and max_run >= 0:
88 logging.info(f"exceeds max run, finish")
89 producer.producer.close()
90 break
91 producer.send(Order.auto().create(100))
92 time.sleep(1)
A sample order record is shown below.
1{
2 "order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16",
3 "ordered_at": "2023-05-13T18:02:54.510497",
4 "user_id": "050",
5 "order_items": [
6 {
7 "product_id": 113,
8 "quantity": 9
9 },
10 {
11 "product_id": 58,
12 "quantity": 5
13 }
14 ]
15}
Consumer
The Consumer class instantiates the KafkaConsumer in the create method. The main consumer configuration values are provided by the constructor arguments: Kafka bootstrap server addresses (bootstrap_servers), topic names (topics) and consumer group ID (group_id). The process()
method of the class polls messages and logs details of consumer records.
1# kafka-pocs/kafka-dev-with-docker/part-04/consumer.py
2import os
3import time
4import logging
5
6from kafka import KafkaConsumer
7from kafka.errors import KafkaError
8
9logging.basicConfig(level=logging.INFO)
10
11
12class Consumer:
13 def __init__(self, bootstrap_servers: list, topics: list, group_id: str) -> None:
14 self.bootstrap_servers = bootstrap_servers
15 self.topics = topics
16 self.group_id = group_id
17 self.consumer = self.create()
18
19 def create(self):
20 return KafkaConsumer(
21 *self.topics,
22 bootstrap_servers=self.bootstrap_servers,
23 auto_offset_reset="earliest",
24 enable_auto_commit=True,
25 group_id=self.group_id,
26 key_deserializer=lambda v: v.decode("utf-8"),
27 value_deserializer=lambda v: v.decode("utf-8"),
28 )
29
30 def process(self):
31 try:
32 while True:
33 msg = self.consumer.poll(timeout_ms=1000)
34 if msg is None:
35 continue
36 self.print_info(msg)
37 time.sleep(1)
38 except KafkaError as error:
39 logging.error(error)
40
41 def print_info(self, msg: dict):
42 for t, v in msg.items():
43 for r in v:
44 logging.info(
45 f"key={r.key}, value={r.value}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
46 )
47
48
49if __name__ == "__main__":
50 consumer = Consumer(
51 bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
52 topics=os.getenv("TOPIC_NAME", "orders").split(","),
53 group_id=os.getenv("GROUP_ID", "orders-group"),
54 )
55 consumer.process()
Consumer Services
As the default number of partitions is set to be 3 in the compose-kafka.yml, a docker compose file is created for the consumer in order to deploy 3 instances of the app using the scale option. As the service uses the same docker network (kafkanet), we can take the service name of the brokers (e.g. kafka-0) on port 9092. Once started, it installs required packages and starts the consumer.
1# kafka-pocs/kafka-dev-with-docker/part-04/compose-consumer.yml
2version: "3.5"
3
4services:
5 app:
6 image: bitnami/python:3.9
7 command: "sh -c 'pip install -r requirements.txt && python consumer.py'"
8 networks:
9 - kafkanet
10 environment:
11 BOOTSTRAP_SERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
12 TOPIC_NAME: orders
13 GROUP_ID: orders-group
14 TZ: Australia/Sydney
15 volumes:
16 - .:/app
17
18networks:
19 kafkanet:
20 external: true
21 name: kafka-network
Start Applications
We can run the Kafka cluster by docker-compose -f compose-kafka.yml up -d
and the producer by python producer.py
. As mentioned earlier, we’ll deploy 3 instances of the consumer, and they can be deployed with the scale option as shown below. We can check the running consumer instances using the ps command of Docker Compose.
1$ docker-compose -f compose-consumer.yml up -d --scale app=3
2$ docker-compose -f compose-consumer.yml ps
3 Name Command State Ports
4-----------------------------------------------------------------
5part-04_app_1 sh -c pip install -r requi ... Up 8000/tcp
6part-04_app_2 sh -c pip install -r requi ... Up 8000/tcp
7part-04_app_3 sh -c pip install -r requi ... Up 8000/tcp
Each instance of the consumer subscribes to its own topic partition, and we can check that by container logs. Below shows the last 10 log entries of one of the instances. It shows that it polls messages from partition 0 only.
1$ docker logs -f --tail 10 part-04_app_1
2
3INFO:root:key={"order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16"}, value={"order_id": "79c0c393-9eca-4a44-8efd-3965752f3e16", "ordered_at": "2023-05-13T18:02:54.510497", "user_id": "050", "order_items": [{"product_id": 113, "quantity": 9}, {"product_id": 58, "quantity": 5}]}, topic=orders, partition=0, offset=11407, ts=1684000974514
4INFO:root:key={"order_id": "d57427fc-5325-49eb-9fb7-e4fac1eca9b4"}, value={"order_id": "d57427fc-5325-49eb-9fb7-e4fac1eca9b4", "ordered_at": "2023-05-13T18:02:54.510548", "user_id": "078", "order_items": [{"product_id": 111, "quantity": 4}]}, topic=orders, partition=0, offset=11408, ts=1684000974514
5INFO:root:key={"order_id": "66c4ca6f-30e2-4f94-a971-ec23c9952430"}, value={"order_id": "66c4ca6f-30e2-4f94-a971-ec23c9952430", "ordered_at": "2023-05-13T18:02:54.510565", "user_id": "004", "order_items": [{"product_id": 647, "quantity": 2}, {"product_id": 894, "quantity": 1}]}, topic=orders, partition=0, offset=11409, ts=1684000974514
6INFO:root:key={"order_id": "518a6812-4357-4ec1-9e5c-aad7853646ee"}, value={"order_id": "518a6812-4357-4ec1-9e5c-aad7853646ee", "ordered_at": "2023-05-13T18:02:54.510609", "user_id": "043", "order_items": [{"product_id": 882, "quantity": 5}]}, topic=orders, partition=0, offset=11410, ts=1684000974514
7INFO:root:key={"order_id": "b22922e8-8ad0-48c3-b970-a486d4576d5c"}, value={"order_id": "b22922e8-8ad0-48c3-b970-a486d4576d5c", "ordered_at": "2023-05-13T18:02:54.510625", "user_id": "002", "order_items": [{"product_id": 206, "quantity": 6}, {"product_id": 810, "quantity": 9}]}, topic=orders, partition=0, offset=11411, ts=1684000974514
8INFO:root:key={"order_id": "1ef36da0-6a0b-4ec2-9ecd-10a020acfbfd"}, value={"order_id": "1ef36da0-6a0b-4ec2-9ecd-10a020acfbfd", "ordered_at": "2023-05-13T18:02:54.510660", "user_id": "085", "order_items": [{"product_id": 18, "quantity": 3}]}, topic=orders, partition=0, offset=11412, ts=1684000974515
9INFO:root:key={"order_id": "e9efdfd8-dc55-47b9-9cb0-c2e87e864435"}, value={"order_id": "e9efdfd8-dc55-47b9-9cb0-c2e87e864435", "ordered_at": "2023-05-13T18:02:54.510692", "user_id": "051", "order_items": [{"product_id": 951, "quantity": 6}]}, topic=orders, partition=0, offset=11413, ts=1684000974515
10INFO:root:key={"order_id": "b24ed1c0-150a-41b3-b1cb-a27fb5581b2b"}, value={"order_id": "b24ed1c0-150a-41b3-b1cb-a27fb5581b2b", "ordered_at": "2023-05-13T18:02:54.510737", "user_id": "096", "order_items": [{"product_id": 734, "quantity": 3}]}, topic=orders, partition=0, offset=11414, ts=1684000974515
11INFO:root:key={"order_id": "74b06957-2c6c-4e46-be49-d2915cc80b74"}, value={"order_id": "74b06957-2c6c-4e46-be49-d2915cc80b74", "ordered_at": "2023-05-13T18:02:54.510774", "user_id": "072", "order_items": [{"product_id": 968, "quantity": 2}, {"product_id": 602, "quantity": 3}, {"product_id": 316, "quantity": 9}, {"product_id": 971, "quantity": 8}]}, topic=orders, partition=0, offset=11415, ts=1684000974515
12INFO:root:key={"order_id": "fce38c6b-4806-4579-b11e-8eac24b5166b"}, value={"order_id": "fce38c6b-4806-4579-b11e-8eac24b5166b", "ordered_at": "2023-05-13T18:02:54.510863", "user_id": "071", "order_items": [{"product_id": 751, "quantity": 8}]}, topic=orders, partition=0, offset=11416, ts=1684000974515
We can also check the consumers with management apps. For example, the 3 running consumers can be seen in the Consumers menu of kafka-ui. As expected, each consumer subscribes to its own topic partition. We can run the management apps by docker-compose -f compose-ui.yml up -d
.
Summary
Kafka includes the Producer/Consumer APIs that allow client applications to send/read streams of data to/from topics in a Kafka cluster. While the main Kafka project maintains only the Java clients, there are several open source projects that provide the Kafka client APIs in Python. In this post, I demonstrated how to develop producer/consumer applications using the kafka-python package.
Comments