By default, Apache Kafka communicates in PLAINTEXT, which means that all data is sent without being encrypted. To secure communication, we can configure Kafka clients and other components to use Transport Layer Security (TLS) encryption. Note that TLS is also referred to Secure Sockets Layer (SSL) or TLS/SSL. SSL is the predecessor of TLS, and has been deprecated since June 2015. However, it is used in configuration and code instead of TLS for historical reasons. In this post, SSL, TLS and TLS/SSL will be used interchangeably. SSL encryption is a one-way verification process where a server certificate is verified by a client via SSL Handshake. Moreover, we can improve security by adding client authentication. For example, we can enforce two-way verification so that a client certificate is verified by Kafka brokers as well (SSL Authentication). Alternatively we can choose a separate authentication mechanism and typically Simple Authentication and Security Layer (SASL) is used (SASL Authentication). In this post, we will discuss how to configure SSL encryption with Java and Python client examples while SSL and SASL client authentication will be covered in later posts.
- Part 1 Cluster Setup
- Part 2 Management App
- Part 3 Kafka Connect
- Part 4 Producer and Consumer
- Part 5 Glue Schema Registry
- Part 6 Kafka Connect with Glue Schema Registry
- Part 7 Producer and Consumer with Glue Schema Registry
- Part 8 SSL Encryption (this post)
- Part 9 SSL Authentication
- Part 10 SASL Authentication
- Part 11 Kafka Authorization
Certificate Setup
Below shows an overview of certificate setup and SSL Handshake. It is from Apache Kafka Series - Kafka Security | SSL SASL Kerberos ACL by Stephane Maarek and Gerd Koenig (LINK).
SSL encryption is a one-way verification process where a server certificate is verified by a client via SSL Handshake. The following components are required for setting-up certificates.
- Certificate Authority (CA) - CA is responsible for signing certificates. We’ll be using our own CA rather than relying upon an external trusted CA. Two files will be created for the CA - private key (ca-key) and certificate (ca-cert).
- Keystore - Keystore stores the identity of each machine (Kafka broker or logical client), and the certificate of a machine is signed by the CA. As the CA’s certificate is imported into the Truststore of a Kafka client, the machine’s certificate is also trusted and verified during SSL Handshake. Note that each machine requires to have its own Keystore. As we have 3 Kafka brokers, 3 Java Keystore files will be created and each of the file names begins with the host name e.g. kafka-0.server.keystore.jks.
- Truststore - Truststore stores one or more certificates that a Kafka client should trust. Note that importing a certificate of a CA means the client should trust all other certificates that are signed by that certificate, which is called the chain of trust. We’ll have a single Java Keystore file for the Truststore named kafka.truststore.jks, and it will be shared by all Kafka brokers and clients.
The following script generates the components mentioned above. It begins with creating the files for the CA followed by generating the Keystore of each Kafka broker and the Truststore of Kafka clients. Note that the host names of all Kafka brokers should be added to the Kafka host file (kafka-hosts.txt) so that their Keystore files are generated recursively. Note also that it ends up producing the CA certificate file in the PEM (Privacy Enhanced Mail) format as it is required by a non-Java client - ca-root.pem. The PEM file will be used by the Python clients below. The source can be found in the GitHub repository of this post.
1# kafka-dev-with-docker/part-08/generate.sh
2#!/usr/bin/env bash
3
4set -eu
5
6CN="${CN:-kafka-admin}"
7PASSWORD="${PASSWORD:-supersecret}"
8TO_GENERATE_PEM="${CITY:-yes}"
9
10VALIDITY_IN_DAYS=3650
11CA_WORKING_DIRECTORY="certificate-authority"
12TRUSTSTORE_WORKING_DIRECTORY="truststore"
13KEYSTORE_WORKING_DIRECTORY="keystore"
14PEM_WORKING_DIRECTORY="pem"
15CA_KEY_FILE="ca-key"
16CA_CERT_FILE="ca-cert"
17DEFAULT_TRUSTSTORE_FILE="kafka.truststore.jks"
18KEYSTORE_SIGN_REQUEST="cert-file"
19KEYSTORE_SIGN_REQUEST_SRL="ca-cert.srl"
20KEYSTORE_SIGNED_CERT="cert-signed"
21KAFKA_HOSTS_FILE="kafka-hosts.txt"
22
23if [ ! -f "$KAFKA_HOSTS_FILE" ]; then
24 echo "'$KAFKA_HOSTS_FILE' does not exists. Create this file"
25 exit 1
26fi
27
28echo "Welcome to the Kafka SSL certificate authority, key store and trust store generator script."
29
30echo
31echo "First we will create our own certificate authority"
32echo " Two files will be created if not existing:"
33echo " - $CA_WORKING_DIRECTORY/$CA_KEY_FILE -- the private key used later to sign certificates"
34echo " - $CA_WORKING_DIRECTORY/$CA_CERT_FILE -- the certificate that will be stored in the trust store"
35echo " and serve as the certificate authority (CA)."
36if [ -f "$CA_WORKING_DIRECTORY/$CA_KEY_FILE" ] && [ -f "$CA_WORKING_DIRECTORY/$CA_CERT_FILE" ]; then
37 echo "Use existing $CA_WORKING_DIRECTORY/$CA_KEY_FILE and $CA_WORKING_DIRECTORY/$CA_CERT_FILE ..."
38else
39 rm -rf $CA_WORKING_DIRECTORY && mkdir $CA_WORKING_DIRECTORY
40 echo
41 echo "Generate $CA_WORKING_DIRECTORY/$CA_KEY_FILE and $CA_WORKING_DIRECTORY/$CA_CERT_FILE ..."
42 echo
43 openssl req -new -newkey rsa:4096 -days $VALIDITY_IN_DAYS -x509 -subj "/CN=$CN" \
44 -keyout $CA_WORKING_DIRECTORY/$CA_KEY_FILE -out $CA_WORKING_DIRECTORY/$CA_CERT_FILE -nodes
45fi
46
47echo
48echo "A keystore will be generated for each host in $KAFKA_HOSTS_FILE as each broker and logical client needs its own keystore"
49echo
50echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of"
51echo " this host. However, at some point, this may change. As such, make the CN"
52echo " the FQDN. Some operating systems call the CN prompt 'first / last name'"
53echo " To learn more about CNs and FQDNs, read:"
54echo " https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html"
55rm -rf $KEYSTORE_WORKING_DIRECTORY && mkdir $KEYSTORE_WORKING_DIRECTORY
56while read -r KAFKA_HOST || [ -n "$KAFKA_HOST" ]; do
57 KEY_STORE_FILE_NAME="$KAFKA_HOST.server.keystore.jks"
58 echo
59 echo "'$KEYSTORE_WORKING_DIRECTORY/$KEY_STORE_FILE_NAME' will contain a key pair and a self-signed certificate."
60 keytool -genkey -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" \
61 -alias localhost -validity $VALIDITY_IN_DAYS -keyalg RSA \
62 -noprompt -dname "CN=$KAFKA_HOST" -keypass $PASSWORD -storepass $PASSWORD
63
64 echo
65 echo "Now a certificate signing request will be made to the keystore."
66 keytool -certreq -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" \
67 -alias localhost -file $KEYSTORE_SIGN_REQUEST -keypass $PASSWORD -storepass $PASSWORD
68
69 echo
70 echo "Now the private key of the certificate authority (CA) will sign the keystore's certificate."
71 openssl x509 -req -CA $CA_WORKING_DIRECTORY/$CA_CERT_FILE \
72 -CAkey $CA_WORKING_DIRECTORY/$CA_KEY_FILE \
73 -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \
74 -days $VALIDITY_IN_DAYS -CAcreateserial
75 # creates $CA_WORKING_DIRECTORY/$KEYSTORE_SIGN_REQUEST_SRL which is never used or needed.
76
77 echo
78 echo "Now the CA will be imported into the keystore."
79 keytool -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" -alias CARoot \
80 -import -file $CA_WORKING_DIRECTORY/$CA_CERT_FILE -keypass $PASSWORD -storepass $PASSWORD -noprompt
81
82 echo
83 echo "Now the keystore's signed certificate will be imported back into the keystore."
84 keytool -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" -alias localhost \
85 -import -file $KEYSTORE_SIGNED_CERT -keypass $PASSWORD -storepass $PASSWORD
86
87 echo
88 echo "Complete keystore generation!"
89 echo
90 echo "Deleting intermediate files. They are:"
91 echo " - '$CA_WORKING_DIRECTORY/$KEYSTORE_SIGN_REQUEST_SRL': CA serial number"
92 echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request"
93 echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back"
94 echo " into the keystore"
95 rm -f $CA_WORKING_DIRECTORY/$KEYSTORE_SIGN_REQUEST_SRL $KEYSTORE_SIGN_REQUEST $KEYSTORE_SIGNED_CERT
96done < "$KAFKA_HOSTS_FILE"
97
98echo
99echo "Now the trust store will be generated from the certificate."
100rm -rf $TRUSTSTORE_WORKING_DIRECTORY && mkdir $TRUSTSTORE_WORKING_DIRECTORY
101keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILE \
102 -alias CARoot -import -file $CA_WORKING_DIRECTORY/$CA_CERT_FILE \
103 -noprompt -dname "CN=$CN" -keypass $PASSWORD -storepass $PASSWORD
104
105if [ $TO_GENERATE_PEM == "yes" ]; then
106 echo
107 echo "The following files for SSL configuration will be created for a non-java client"
108 echo " $PEM_WORKING_DIRECTORY/ca-root.pem: CA file to use in certificate veriication"
109 rm -rf $PEM_WORKING_DIRECTORY && mkdir $PEM_WORKING_DIRECTORY
110
111 keytool -exportcert -alias CARoot -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILE \
112 -rfc -file $PEM_WORKING_DIRECTORY/ca-root.pem -storepass $PASSWORD
113fi
The script generates the following files listed below.
1$ tree certificate-authority keystore truststore pem
2certificate-authority
3├── ca-cert
4└── ca-key
5keystore
6├── kafka-0.server.keystore.jks
7├── kafka-1.server.keystore.jks
8└── kafka-2.server.keystore.jks
9truststore
10└── kafka.truststore.jks
11pem
12└── ca-root.pem
Kafka Broker Update
We should add the SSL listener to the broker configuration and the port 9093 is reserved for it. Both the Keystore and Truststore files are specified in the broker configuration. The former is to send the broker certificate to clients while the latter is necessary because a Kafka broker can be a client of other brokers. The changes made to the first Kafka broker are shown below, and the same updates are made to the other brokers. The cluster can be started by docker-compose -f compose-kafka.yml up -d
.
1# kafka-dev-with-docker/part-08/compose-kafka.yml
2version: "3.5"
3
4services:
5...
6
7 kafka-0:
8 image: bitnami/kafka:2.8.1
9 container_name: kafka-0
10 expose:
11 - 9092
12 - 9093
13 ports:
14 - "29092:29092"
15 networks:
16 - kafkanet
17 environment:
18 - ALLOW_PLAINTEXT_LISTENER=yes
19 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
20 - KAFKA_CFG_BROKER_ID=0
21 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,SSL:SSL,EXTERNAL:PLAINTEXT
22 - KAFKA_CFG_LISTENERS=INTERNAL://:9092,SSL://:9093,EXTERNAL://:29092
23 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,SSL://kafka-0:9093,EXTERNAL://localhost:29092
24 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SSL
25 - KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
26 - KAFKA_CFG_SSL_KEYSTORE_PASSWORD=supersecret
27 - KAFKA_CFG_SSL_KEY_PASSWORD=supersecret
28 - KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
29 - KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=supersecret
30 volumes:
31 - kafka_0_data:/bitnami/kafka
32 - ./keystore/kafka-0.server.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
33 - ./truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
34 - ./client.properties:/opt/bitnami/kafka/config/client.properties:ro
35 depends_on:
36 - zookeeper
37
38...
39
40networks:
41 kafkanet:
42 name: kafka-network
43
44...
Examples
Java and non-Java clients need different configurations. The former can use the Keystore file of the Truststore directly while the latter needs corresponding details in a PEM file. The Kafka CLI and Kafka-UI will be taken as Java client examples while Python producer/consumer will be used to illustrate non-Java clients.
Kafka CLI
The following configuration is necessary to use the SSL listener. It includes the security protocol, the location of the Truststore file and the password to access it.
1# kafka-dev-with-docker/part-08/client.properties
2security.protocol=SSL
3ssl.truststore.location=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
4ssl.truststore.password=supersecret
Below shows a producer example. It creates a topic named inventory and produces messages using corresponding scripts. Note the client configuration file (client.properties) is specified in configurations, and it is available via volume-mapping.
1## producer example
2$ docker exec -it kafka-1 bash
3I have no name!@07d1ca934530:/$ cd /opt/bitnami/kafka/bin/
4
5## create a topic
6I have no name!@07d1ca934530:/opt/bitnami/kafka/bin$ ./kafka-topics.sh --bootstrap-server kafka-0:9093 \
7 --create --topic inventory --partitions 3 --replication-factor 3 \
8 --command-config /opt/bitnami/kafka/config/client.properties
9# Created topic inventory.
10
11## produce messages
12I have no name!@07d1ca934530:/opt/bitnami/kafka/bin$ ./kafka-console-producer.sh --bootstrap-server kafka-0:9093 \
13 --topic inventory --producer.config /opt/bitnami/kafka/config/client.properties
14>product: apples, quantity: 5
15>product: lemons, quantity: 7
Once messages are created, we can check it by a consumer. We can execute a consumer in a separate console.
1## consumer example
2$ docker exec -it kafka-1 bash
3I have no name!@07d1ca934530:/$ cd /opt/bitnami/kafka/bin/
4
5## consume messages
6I have no name!@07d1ca934530:/opt/bitnami/kafka/bin$ ./kafka-console-consumer.sh --bootstrap-server kafka-0:9093 \
7 --topic inventory --consumer.config /opt/bitnami/kafka/config/client.properties --from-beginning
8product: apples, quantity: 5
9product: lemons, quantity: 7
Python Client
We will run the Python producer and consumer apps using docker-compose. At startup, each of them installs required packages and executes its corresponding app script. As it shares the same network to the Kafka cluster, we can take the service names (e.g. kafka-0) on port 9093 as Kafka bootstrap servers. As shown below, we will need the certificate of the CA (ca-root.pem) and it will be available via volume-mapping. The apps can be started by docker-compose -f compose-apps.yml up -d
.
1# kafka-dev-with-docker/part-08/compose-apps.yml
2version: "3.5"
3
4services:
5 producer:
6 image: bitnami/python:3.9
7 container_name: producer
8 command: "sh -c 'pip install -r requirements.txt && python producer.py'"
9 networks:
10 - kafkanet
11 environment:
12 BOOTSTRAP_SERVERS: kafka-0:9093,kafka-1:9093,kafka-2:9093
13 TOPIC_NAME: orders
14 TZ: Australia/Sydney
15 volumes:
16 - .:/app
17 consumer:
18 image: bitnami/python:3.9
19 container_name: consumer
20 command: "sh -c 'pip install -r requirements.txt && python consumer.py'"
21 networks:
22 - kafkanet
23 environment:
24 BOOTSTRAP_SERVERS: kafka-0:9093,kafka-1:9093,kafka-2:9093
25 TOPIC_NAME: orders
26 GROUP_ID: orders-group
27 TZ: Australia/Sydney
28 volumes:
29 - .:/app
30
31networks:
32 kafkanet:
33 external: true
34 name: kafka-network
Producer
The same producer app discussed in Part 4 is used here. The following arguments are added to access the SSL listener.
- security_protocol - Protocol used to communicate with brokers.
- ssl_check_hostname - Flag to configure whether SSL handshake should verify that the certificate matches the broker’s hostname.
- ssl_cafile - Optional filename of CA (certificate) file to use in certificate verification.
1# kafka-dev-with-docker/part-08/producer.py
2...
3
4class Producer:
5 def __init__(self, bootstrap_servers: list, topic: str):
6 self.bootstrap_servers = bootstrap_servers
7 self.topic = topic
8 self.producer = self.create()
9
10 def create(self):
11 return KafkaProducer(
12 bootstrap_servers=self.bootstrap_servers,
13 security_protocol="SSL",
14 ssl_check_hostname=True,
15 ssl_cafile="pem/ca-root.pem",
16 value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
17 key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
18 )
19
20 def send(self, orders: typing.List[Order]):
21 for order in orders:
22 try:
23 self.producer.send(
24 self.topic, key={"order_id": order.order_id}, value=order.asdict()
25 )
26 except Exception as e:
27 raise RuntimeError("fails to send a message") from e
28 self.producer.flush()
29
30...
31
32if __name__ == "__main__":
33 producer = Producer(
34 bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
35 topic=os.getenv("TOPIC_NAME", "orders"),
36 )
37 max_run = int(os.getenv("MAX_RUN", "-1"))
38 logging.info(f"max run - {max_run}")
39 current_run = 0
40 while True:
41 current_run += 1
42 logging.info(f"current run - {current_run}")
43 if current_run > max_run and max_run >= 0:
44 logging.info(f"exceeds max run, finish")
45 producer.producer.close()
46 break
47 producer.send(Order.auto().create(100))
48 time.sleep(1)
In the container log, we can check SSH Handshake is performed successfully by loading the CA certificate file.
1INFO:kafka.conn:<BrokerConnection node_id=bootstrap-1 host=kafka-0:9093 <connecting> [IPv4 ('172.20.0.3', 9093)]>: connecting to kafka-0:9093 [('172.20.0.3', 9093) IPv4]
2INFO:kafka.conn:Probing node bootstrap-1 broker version
3INFO:kafka.conn:<BrokerConnection node_id=bootstrap-1 host=kafka-0:9093 <handshake> [IPv4 ('172.20.0.3', 9093)]>: Loading SSL CA from pem/ca-root.pem
4INFO:kafka.conn:<BrokerConnection node_id=bootstrap-1 host=kafka-0:9093 <handshake> [IPv4 ('172.20.0.3', 9093)]>: Connection complete.
5INFO:root:max run - -1
6INFO:root:current run - 1
7...
8INFO:root:current run - 2
Consumer
The same consumer app in Part 4 is used here as well. As the producer app, the following arguments are added - security_protocol, ssl_check_hostname and ssl_cafile.
1# kafka-dev-with-docker/part-08/consumer.py
2...
3
4class Consumer:
5 def __init__(self, bootstrap_servers: list, topics: list, group_id: str) -> None:
6 self.bootstrap_servers = bootstrap_servers
7 self.topics = topics
8 self.group_id = group_id
9 self.consumer = self.create()
10
11 def create(self):
12 return KafkaConsumer(
13 *self.topics,
14 bootstrap_servers=self.bootstrap_servers,
15 security_protocol="SSL",
16 ssl_check_hostname=True,
17 ssl_cafile="pem/ca-root.pem",
18 auto_offset_reset="earliest",
19 enable_auto_commit=True,
20 group_id=self.group_id,
21 key_deserializer=lambda v: v.decode("utf-8"),
22 value_deserializer=lambda v: v.decode("utf-8"),
23 )
24
25 def process(self):
26 try:
27 while True:
28 msg = self.consumer.poll(timeout_ms=1000)
29 if msg is None:
30 continue
31 self.print_info(msg)
32 time.sleep(1)
33 except KafkaError as error:
34 logging.error(error)
35
36 def print_info(self, msg: dict):
37 for t, v in msg.items():
38 for r in v:
39 logging.info(
40 f"key={r.key}, value={r.value}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
41 )
42
43
44if __name__ == "__main__":
45 consumer = Consumer(
46 bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
47 topics=os.getenv("TOPIC_NAME", "orders").split(","),
48 group_id=os.getenv("GROUP_ID", "orders-group"),
49 )
50 consumer.process()
We can also check messages are consumed after SSH Handshake is succeeded in the container log.
1...
2INFO:kafka.conn:<BrokerConnection node_id=0 host=kafka-0:9093 <connecting> [IPv4 ('172.20.0.3', 9093)]>: connecting to kafka-0:9093 [('172.20.0.3', 9093) IPv4]
3INFO:kafka.conn:<BrokerConnection node_id=0 host=kafka-0:9093 <handshake> [IPv4 ('172.20.0.3', 9093)]>: Loading SSL CA from pem/ca-root.pem
4INFO:kafka.conn:<BrokerConnection node_id=0 host=kafka-0:9093 <handshake> [IPv4 ('172.20.0.3', 9093)]>: Connection complete.
5INFO:kafka.cluster:Group coordinator for orders-group is BrokerMetadata(nodeId='coordinator-0', host='kafka-0', port=9093, rack=None)
6INFO:kafka.coordinator:Discovered coordinator coordinator-0 for group orders-group
7WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-0) for group orders-group: Node Disconnected.
8INFO:kafka.conn:<BrokerConnection node_id=coordinator-0 host=kafka-0:9093 <connecting> [IPv4 ('172.20.0.3', 9093)]>: connecting to kafka-0:9093 [('172.20.0.3', 9093) IPv4]
9INFO:kafka.cluster:Group coordinator for orders-group is BrokerMetadata(nodeId='coordinator-0', host='kafka-0', port=9093, rack=None)
10INFO:kafka.coordinator:Discovered coordinator coordinator-0 for group orders-group
11INFO:kafka.conn:<BrokerConnection node_id=coordinator-0 host=kafka-0:9093 <handshake> [IPv4 ('172.20.0.3', 9093)]>: Connection complete.
12INFO:kafka.coordinator:(Re-)joining group orders-group
13INFO:kafka.coordinator:Elected group leader -- performing partition assignments using range
14INFO:kafka.coordinator:Successfully joined group orders-group with generation 3
15INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='orders', partition=0)]
16INFO:kafka.coordinator.consumer:Setting newly assigned partitions {TopicPartition(topic='orders', partition=0)} for group orders-group
17...
18INFO:root:key={"order_id": "6f642267-0497-4e63-8989-45e29e768351"}, value={"order_id": "6f642267-0497-4e63-8989-45e29e768351", "ordered_at": "2023-06-20T20:26:45.635986", "user_id": "003", "order_items": [{"product_id": 1000, "quantity": 2}, {"product_id": 541, "quantity": 10}, {"product_id": 431, "quantity": 10}, {"product_id": 770, "quantity": 7}]}, topic=orders, partition=0, offset=10700, ts=1687292805638
19INFO:root:key={"order_id": "1d5a92bc-75e0-46e9-a334-43e03e408ea0"}, value={"order_id": "1d5a92bc-75e0-46e9-a334-43e03e408ea0", "ordered_at": "2023-06-20T20:26:45.636034", "user_id": "032", "order_items": [{"product_id": 404, "quantity": 7}, {"product_id": 932, "quantity": 8}]}, topic=orders, partition=0, offset=10701, ts=1687292805638
Kafka-UI
Kafka-UI is also a Java client, and it accepts the Keystore file of the Kafka Truststore (kafka.truststore.jks). We can specify the file and password to access it as environment variables. The app can be started by docker-compose -f compose-ui.yml up -d
.
1# kafka-dev-with-docker/part-08/compose-ui.yml
2version: "3.5"
3
4services:
5 kafka-ui:
6 image: provectuslabs/kafka-ui:master
7 container_name: kafka-ui
8 ports:
9 - "8080:8080"
10 networks:
11 - kafkanet
12 environment:
13 KAFKA_CLUSTERS_0_NAME: local
14 KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SSL
15 KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9093,kafka-1:9093,kafka-2:9093
16 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
17 KAFKA_CLUSTERS_0_SSL_TRUSTSTORELOCATION: /kafka.truststore.jks
18 KAFKA_CLUSTERS_0_SSL_TRUSTSTOREPASSWORD: supersecret
19 volumes:
20 - ./truststore/kafka.truststore.jks:/kafka.truststore.jks:ro
21
22networks:
23 kafkanet:
24 external: true
25 name: kafka-network
Once started, we can check the messages of the orders topic successfully.
Summary
By default, Apache Kafka communicates in PLAINTEXT, and we can configure Kafka clients and other components to use TLS (SSL or TLS/SSL) encryption to secure communication. In this post, we discussed how to configure SSL encryption with Java and Python client examples.
Comments