In the previous post, we discussed how to configure TLS (SSL or TLS/SSL) encryption with Java and Python client examples. SSL encryption is a one-way verification process where a server certificate is verified by a client via SSL Handshake. To improve security, we can add client authentication either by enforcing two-way verification where a client certificate is verified by Kafka brokers (SSL authentication). Or we can choose a separate authentication mechanism, which is typically Simple Authentication and Security Layer (SASL). In this post, we will discuss how to implement SSL authentication with Java and Python client examples while SASL authentication is covered in the next post.

Certificate Setup

Below shows an overview of certificate setup and SSL authentication. Compared to SSL encryption, we need an additional Keystore for the client and the client certificate should be verified by Kafka brokers. It is from Apache Kafka Series - Kafka Security | SSL SASL Kerberos ACL by Stephane Maarek and Gerd Koenig (LINK).

SSL authentication is a two-way verification process where both the server and client verify the certificate of their counterpart via SSL Handshake. The following components are required for setting-up certificates.

  • Certificate Authority (CA) - CA is responsible for signing certificates. We’ll be using our own CA rather than relying upon an external trusted CA. Two files will be created for the CA - private key (ca-key) and certificate (ca-cert).
  • Keystore - Keystore stores the identity of each machine (Kafka broker or logical client), and the certificate of a machine is signed by the CA. As the CA’s certificate is imported into the Truststore of a Kafka client, the machine’s certificate is also trusted and verified during SSL Handshake. Note that each machine requires to have its own Keystore. As we have 3 Kafka brokers, 3 Java Keystore files will be created and each of the file names begins with the host name e.g. kafka-0.server.keystore.jks. Also we will keep a single Keystore for all Kafka clients for simplicity - kafka.client.keystore.jks.
  • Truststore - Truststore stores one or more certificates that a Kafka client should trust. Note that importing a certificate of a CA means the client should trust all other certificates that are signed by that certificate, which is called the chain of trust. We’ll have a single Java Keystore file for the Truststore named kafka.truststore.jks, and it will be shared by all Kafka brokers and clients.

The following script generates the components mentioned above. It begins with creating the files for the CA. Then it generates the Keystore of each Kafka broker and client followed by producing the Truststore of Kafka clients. Note that the host names of all Kafka brokers and client should be added to the Kafka host file (kafka-hosts.txt) so that their Keystore files are generated recursively. Note also that non-Java clients require PEM (Privacy Enhanced Mail) files rather than Java Keystore files. Therefore, the following files are created and they will be used by the Python clients below.

  • ca-root.pem - CA file to use in certificate veriication
  • client-certificate.pem - File that contains client certificate, as well as any CA certificates needed to establish the certificate’s authenticity
  • client-private-key.pem - File that contains client private key

The source can be found in the GitHub repository of this post.

  1# kafka-dev-with-docker/part-09/generate.sh
  2#!/usr/bin/env bash
  3 
  4set -eu
  5
  6CN="${CN:-kafka-admin}"
  7PASSWORD="${PASSWORD:-supersecret}"
  8TO_GENERATE_PEM="${CITY:-yes}"
  9
 10VALIDITY_IN_DAYS=3650
 11CA_WORKING_DIRECTORY="certificate-authority"
 12TRUSTSTORE_WORKING_DIRECTORY="truststore"
 13KEYSTORE_WORKING_DIRECTORY="keystore"
 14PEM_WORKING_DIRECTORY="pem"
 15CA_KEY_FILE="ca-key"
 16CA_CERT_FILE="ca-cert"
 17DEFAULT_TRUSTSTORE_FILE="kafka.truststore.jks"
 18KEYSTORE_SIGN_REQUEST="cert-file"
 19KEYSTORE_SIGN_REQUEST_SRL="ca-cert.srl"
 20KEYSTORE_SIGNED_CERT="cert-signed"
 21KAFKA_HOSTS_FILE="kafka-hosts.txt"
 22 
 23if [ ! -f "$KAFKA_HOSTS_FILE" ]; then
 24  echo "'$KAFKA_HOSTS_FILE' does not exists. Create this file"
 25  exit 1
 26fi
 27 
 28echo "Welcome to the Kafka SSL certificate authority, key store and trust store generator script."
 29
 30echo
 31echo "First we will create our own certificate authority"
 32echo "  Two files will be created if not existing:"
 33echo "    - $CA_WORKING_DIRECTORY/$CA_KEY_FILE -- the private key used later to sign certificates"
 34echo "    - $CA_WORKING_DIRECTORY/$CA_CERT_FILE -- the certificate that will be stored in the trust store" 
 35echo "                                                        and serve as the certificate authority (CA)."
 36if [ -f "$CA_WORKING_DIRECTORY/$CA_KEY_FILE" ] && [ -f "$CA_WORKING_DIRECTORY/$CA_CERT_FILE" ]; then
 37  echo "Use existing $CA_WORKING_DIRECTORY/$CA_KEY_FILE and $CA_WORKING_DIRECTORY/$CA_CERT_FILE ..."
 38else
 39  rm -rf $CA_WORKING_DIRECTORY && mkdir $CA_WORKING_DIRECTORY
 40  echo
 41  echo "Generate $CA_WORKING_DIRECTORY/$CA_KEY_FILE and $CA_WORKING_DIRECTORY/$CA_CERT_FILE ..."
 42  echo
 43  openssl req -new -newkey rsa:4096 -days $VALIDITY_IN_DAYS -x509 -subj "/CN=$CN" \
 44    -keyout $CA_WORKING_DIRECTORY/$CA_KEY_FILE -out $CA_WORKING_DIRECTORY/$CA_CERT_FILE -nodes
 45fi
 46
 47echo
 48echo "A keystore will be generated for each host in $KAFKA_HOSTS_FILE as each broker and logical client needs its own keystore"
 49echo
 50echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of"
 51echo " this host. However, at some point, this may change. As such, make the CN"
 52echo " the FQDN. Some operating systems call the CN prompt 'first / last name'" 
 53echo " To learn more about CNs and FQDNs, read:"
 54echo " https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html"
 55rm -rf $KEYSTORE_WORKING_DIRECTORY && mkdir $KEYSTORE_WORKING_DIRECTORY
 56while read -r KAFKA_HOST || [ -n "$KAFKA_HOST" ]; do
 57  if [[ $KAFKA_HOST =~ ^kafka-[0-9]+$ ]]; then
 58      SUFFIX="server"
 59      DNAME="CN=$KAFKA_HOST"
 60  else
 61      SUFFIX="client"
 62      DNAME="CN=client"
 63  fi
 64  KEY_STORE_FILE_NAME="$KAFKA_HOST.$SUFFIX.keystore.jks"
 65  echo
 66  echo "'$KEYSTORE_WORKING_DIRECTORY/$KEY_STORE_FILE_NAME' will contain a key pair and a self-signed certificate."
 67  keytool -genkey -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" \
 68    -alias localhost -validity $VALIDITY_IN_DAYS -keyalg RSA \
 69    -noprompt -dname $DNAME -keypass $PASSWORD -storepass $PASSWORD
 70 
 71  echo
 72  echo "Now a certificate signing request will be made to the keystore."
 73  keytool -certreq -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" \
 74    -alias localhost -file $KEYSTORE_SIGN_REQUEST -keypass $PASSWORD -storepass $PASSWORD
 75 
 76  echo
 77  echo "Now the private key of the certificate authority (CA) will sign the keystore's certificate."
 78  openssl x509 -req -CA $CA_WORKING_DIRECTORY/$CA_CERT_FILE \
 79    -CAkey $CA_WORKING_DIRECTORY/$CA_KEY_FILE \
 80    -in $KEYSTORE_SIGN_REQUEST -out $KEYSTORE_SIGNED_CERT \
 81    -days $VALIDITY_IN_DAYS -CAcreateserial
 82  # creates $CA_WORKING_DIRECTORY/$KEYSTORE_SIGN_REQUEST_SRL which is never used or needed.
 83 
 84  echo
 85  echo "Now the CA will be imported into the keystore."
 86  keytool -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" -alias CARoot \
 87    -import -file $CA_WORKING_DIRECTORY/$CA_CERT_FILE -keypass $PASSWORD -storepass $PASSWORD -noprompt
 88 
 89  echo
 90  echo "Now the keystore's signed certificate will be imported back into the keystore."
 91  keytool -keystore $KEYSTORE_WORKING_DIRECTORY/"$KEY_STORE_FILE_NAME" -alias localhost \
 92    -import -file $KEYSTORE_SIGNED_CERT -keypass $PASSWORD -storepass $PASSWORD
 93
 94  echo
 95  echo "Complete keystore generation!"
 96  echo
 97  echo "Deleting intermediate files. They are:"
 98  echo " - '$CA_WORKING_DIRECTORY/$KEYSTORE_SIGN_REQUEST_SRL': CA serial number"
 99  echo " - '$KEYSTORE_SIGN_REQUEST': the keystore's certificate signing request"
100  echo " - '$KEYSTORE_SIGNED_CERT': the keystore's certificate, signed by the CA, and stored back"
101  echo " into the keystore"
102  rm -f $CA_WORKING_DIRECTORY/$KEYSTORE_SIGN_REQUEST_SRL $KEYSTORE_SIGN_REQUEST $KEYSTORE_SIGNED_CERT
103done < "$KAFKA_HOSTS_FILE"
104
105echo
106echo "Now the trust store will be generated from the certificate."
107rm -rf $TRUSTSTORE_WORKING_DIRECTORY && mkdir $TRUSTSTORE_WORKING_DIRECTORY
108keytool -keystore $TRUSTSTORE_WORKING_DIRECTORY/$DEFAULT_TRUSTSTORE_FILE \
109  -alias CARoot -import -file $CA_WORKING_DIRECTORY/$CA_CERT_FILE \
110  -noprompt -dname "CN=$CN" -keypass $PASSWORD -storepass $PASSWORD
111
112if [ $TO_GENERATE_PEM == "yes" ]; then
113  echo
114  echo "The following files for SSL configuration will be created for a non-java client"
115  echo "  $PEM_WORKING_DIRECTORY/ca-root.pem: CA file to use in certificate veriication (ssl_cafile)"
116  echo "  $PEM_WORKING_DIRECTORY/client-certificate.pem: File that contains client certificate, as well as"
117  echo "                any ca certificates needed to establish the certificate's authenticity (ssl_certfile)"
118  echo "  $PEM_WORKING_DIRECTORY/client-private-key.pem: File that contains client private key (ssl_keyfile)"
119  rm -rf $PEM_WORKING_DIRECTORY && mkdir $PEM_WORKING_DIRECTORY
120
121  keytool -exportcert -alias CARoot -keystore $KEYSTORE_WORKING_DIRECTORY/kafka.client.keystore.jks \
122    -rfc -file $PEM_WORKING_DIRECTORY/ca-root.pem -storepass $PASSWORD
123
124  keytool -exportcert -alias localhost -keystore $KEYSTORE_WORKING_DIRECTORY/kafka.client.keystore.jks \
125    -rfc -file $PEM_WORKING_DIRECTORY/client-certificate.pem -storepass $PASSWORD
126
127  keytool -importkeystore -srcalias localhost -srckeystore $KEYSTORE_WORKING_DIRECTORY/kafka.client.keystore.jks \
128    -destkeystore cert_and_key.p12 -deststoretype PKCS12 -srcstorepass $PASSWORD -deststorepass $PASSWORD
129  openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes -password pass:$PASSWORD \
130    | awk '/-----BEGIN PRIVATE KEY-----/,/-----END PRIVATE KEY-----/' > $PEM_WORKING_DIRECTORY/client-private-key.pem
131  rm -f cert_and_key.p12
132fi

The script generates the following files listed below.

 1$ tree certificate-authority keystore truststore pem
 2certificate-authority
 3├── ca-cert
 4└── ca-key
 5keystore
 6├── kafka-0.server.keystore.jks
 7├── kafka-1.server.keystore.jks
 8├── kafka-2.server.keystore.jks
 9└── kafka.client.keystore.jks
10truststore
11└── kafka.truststore.jks
12pem
13├── ca-root.pem
14├── client-certificate.pem
15└── client-private-key.pem

Kafka Broker Update

We should add the SSL listener to the broker configuration and the port 9093 is reserved for it. Both the Keystore and Truststore files are specified in the broker configuration. The former is to send the broker certificate to clients while the latter is necessary because a Kafka broker can be a client of other brokers. Also, we should make SSL client authentication to be required by updating the KAFKA_CFG_SSL_CLIENT_AUTH environment variable. The changes made to the first Kafka broker are shown below, and the same updates are made to the other brokers. The cluster can be started by docker-compose -f compose-kafka.yml up -d.

 1# kafka-dev-with-docker/part-09/compose-kafka.yml
 2version: "3.5"
 3
 4services:
 5...
 6
 7  kafka-0:
 8    image: bitnami/kafka:2.8.1
 9    container_name: kafka-0
10    expose:
11      - 9092
12      - 9093
13    ports:
14      - "29092:29092"
15    networks:
16      - kafkanet
17    environment:
18      - ALLOW_PLAINTEXT_LISTENER=yes
19      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
20      - KAFKA_CFG_BROKER_ID=0
21      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,SSL:SSL,EXTERNAL:PLAINTEXT
22      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,SSL://:9093,EXTERNAL://:29092
23      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,SSL://kafka-0:9093,EXTERNAL://localhost:29092
24      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SSL
25      - KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
26      - KAFKA_CFG_SSL_KEYSTORE_PASSWORD=supersecret
27      - KAFKA_CFG_SSL_KEY_PASSWORD=supersecret
28      - KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
29      - KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=supersecret
30      - KAFKA_CFG_SSL_CLIENT_AUTH=required
31    volumes:
32      - kafka_0_data:/bitnami/kafka
33      - ./keystore/kafka-0.server.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
34      - ./keystore/kafka.client.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.client.keystore.jks:ro
35      - ./truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
36      - ./client.properties:/opt/bitnami/kafka/config/client.properties:ro
37    depends_on:
38      - zookeeper
39
40...
41
42networks:
43  kafkanet:
44    name: kafka-network
45
46...

Examples

Java and non-Java clients need different configurations. The former can use Java Keystore files directly while the latter needs corresponding details in PEM files. The Kafka CLI and Kafka-UI will be taken as Java client examples while Python producer/consumer will be used to illustrate non-Java clients.

Kafka CLI

The following configuration is necessary to use the SSL listener. It includes the security protocol and details about the Keystore and Truststore.

1# kafka-dev-with-docker/part-09/client.properties
2security.protocol=SSL
3ssl.truststore.location=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
4ssl.truststore.password=supersecret
5ssl.keystore.location=/opt/bitnami/kafka/config/certs/kafka.client.keystore.jks
6ssl.keystore.password=supersecret
7ssl.key.password=supersecret

Below shows a producer example. It creates a topic named inventory and produces messages using corresponding scripts. Note the client configuration file (client.properties) is specified in configurations, and it is available via volume-mapping.

 1## producer example
 2$ docker exec -it kafka-1 bash
 3I have no name!@be871da96c09:/$ cd /opt/bitnami/kafka/bin/
 4
 5## create a topic
 6I have no name!@be871da96c09:/opt/bitnami/kafka/bin$ ./kafka-topics.sh --bootstrap-server kafka-0:9093 \
 7  --create --topic inventory --partitions 3 --replication-factor 3 \
 8  --command-config /opt/bitnami/kafka/config/client.properties
 9# Created topic inventory.
10
11## produce messages
12I have no name!@be871da96c09:/opt/bitnami/kafka/bin$ ./kafka-console-producer.sh --bootstrap-server kafka-0:9093 \
13  --topic inventory --producer.config /opt/bitnami/kafka/config/client.properties
14>product: apples, quantity: 5
15>product: lemons, quantity: 7

Once messages are created, we can check it by a consumer. We can execute a consumer in a separate console.

1## consumer example
2$ docker exec -it kafka-1 bash
3I have no name!@be871da96c09:/$ cd /opt/bitnami/kafka/bin/
4
5## consume messages
6I have no name!@be871da96c09:/opt/bitnami/kafka/bin$ ./kafka-console-consumer.sh --bootstrap-server kafka-0:9093 \
7  --topic inventory --consumer.config /opt/bitnami/kafka/config/client.properties --from-beginning
8product: apples, quantity: 5
9product: lemons, quantity: 7

Python Client

We will run the Python producer and consumer apps using docker-compose. At startup, each of them installs required packages and executes its corresponding app script. As it shares the same network to the Kafka cluster, we can take the service names (e.g. kafka-0) on port 9093 as Kafka bootstrap servers. As shown below, we will need multiple PEM files, and they will be available via volume-mapping. The apps can be started by docker-compose -f compose-apps.yml up -d.

 1# kafka-dev-with-docker/part-09/compose-apps.yml
 2version: "3.5"
 3
 4services:
 5  producer:
 6    image: bitnami/python:3.9
 7    container_name: producer
 8    command: "sh -c 'pip install -r requirements.txt && python producer.py'"
 9    networks:
10      - kafkanet
11    environment:
12      BOOTSTRAP_SERVERS: kafka-0:9093,kafka-1:9093,kafka-2:9093
13      TOPIC_NAME: orders
14      TZ: Australia/Sydney
15    volumes:
16      - .:/app
17  consumer:
18    image: bitnami/python:3.9
19    container_name: consumer
20    command: "sh -c 'pip install -r requirements.txt && python consumer.py'"
21    networks:
22      - kafkanet
23    environment:
24      BOOTSTRAP_SERVERS: kafka-0:9093,kafka-1:9093,kafka-2:9093
25      TOPIC_NAME: orders
26      GROUP_ID: orders-group
27      TZ: Australia/Sydney
28    volumes:
29      - .:/app
30
31networks:
32  kafkanet:
33    external: true
34    name: kafka-network

Producer

The same producer app discussed in Part 4 is used here. The following arguments are added to access the SSL listener.

  • security_protocol - Protocol used to communicate with brokers.
  • ssl_check_hostname - Flag to configure whether SSL handshake should verify that the certificate matches the broker’s hostname.
  • ssl_cafile - Optional filename of CA (certificate) file to use in certificate verification.
  • ssl_certfile - Optional filename that contains client certificate, as well as any CA certificates needed to establish the certificate’s authenticity.
  • ssl_keyfile - Optional filename that contains the client private key.
 1# kafka-dev-with-docker/part-09/producer.py
 2...
 3
 4class Producer:
 5    def __init__(self, bootstrap_servers: list, topic: str):
 6        self.bootstrap_servers = bootstrap_servers
 7        self.topic = topic
 8        self.producer = self.create()
 9
10    def create(self):
11        return KafkaProducer(
12            bootstrap_servers=self.bootstrap_servers,
13            security_protocol="SSL",
14            ssl_check_hostname=True,
15            ssl_cafile="pem/ca-root.pem",
16            ssl_certfile="pem/client-certificate.pem",
17            ssl_keyfile="pem/client-private-key.pem",
18            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
19            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
20        )
21
22    def send(self, orders: typing.List[Order]):
23        for order in orders:
24            try:
25                self.producer.send(
26                    self.topic, key={"order_id": order.order_id}, value=order.asdict()
27                )
28            except Exception as e:
29                raise RuntimeError("fails to send a message") from e
30        self.producer.flush()
31
32...
33
34if __name__ == "__main__":
35    producer = Producer(
36        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
37        topic=os.getenv("TOPIC_NAME", "orders"),
38    )
39    max_run = int(os.getenv("MAX_RUN", "-1"))
40    logging.info(f"max run - {max_run}")
41    current_run = 0
42    while True:
43        current_run += 1
44        logging.info(f"current run - {current_run}")
45        if current_run > max_run and max_run >= 0:
46            logging.info(f"exceeds max run, finish")
47            producer.producer.close()
48            break
49        producer.send(Order.auto().create(100))
50        time.sleep(1)

In the container log, we can check SSH Handshake is performed successfully by loading the PEM files.

 1INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-2:9093 <connecting> [IPv4 ('172.24.0.5', 9093)]>: connecting to kafka-2:9093 [('172.24.0.5', 9093) IPv4]
 2INFO:kafka.conn:Probing node bootstrap-0 broker version
 3INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Loading SSL CA from pem/ca-root.pem
 4INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Loading SSL Cert from pem/client-certificate.pem
 5INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Loading SSL Key from pem/client-private-key.pem
 6INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Connection complete.
 7INFO:root:max run - -1
 8INFO:root:current run - 1
 9...
10INFO:root:current run - 2

Consumer

The same consumer app in Part 4 is used here as well. As the producer app, the following arguments are added - security_protocol, ssl_check_hostname, ssl_cafile, ssl_certfile and ssl_keyfile.

 1# kafka-dev-with-docker/part-09/consumer.py
 2...
 3
 4class Consumer:
 5    def __init__(self, bootstrap_servers: list, topics: list, group_id: str) -> None:
 6        self.bootstrap_servers = bootstrap_servers
 7        self.topics = topics
 8        self.group_id = group_id
 9        self.consumer = self.create()
10
11    def create(self):
12        return KafkaConsumer(
13            *self.topics,
14            bootstrap_servers=self.bootstrap_servers,
15            security_protocol="SSL",
16            ssl_check_hostname=True,
17            ssl_cafile="pem/ca-root.pem",
18            ssl_certfile="pem/client-certificate.pem",
19            ssl_keyfile="pem/client-private-key.pem",
20            auto_offset_reset="earliest",
21            enable_auto_commit=True,
22            group_id=self.group_id,
23            key_deserializer=lambda v: v.decode("utf-8"),
24            value_deserializer=lambda v: v.decode("utf-8"),
25        )
26
27    def process(self):
28        try:
29            while True:
30                msg = self.consumer.poll(timeout_ms=1000)
31                if msg is None:
32                    continue
33                self.print_info(msg)
34                time.sleep(1)
35        except KafkaError as error:
36            logging.error(error)
37
38    def print_info(self, msg: dict):
39        for t, v in msg.items():
40            for r in v:
41                logging.info(
42                    f"key={r.key}, value={r.value}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
43                )
44
45
46if __name__ == "__main__":
47    consumer = Consumer(
48        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
49        topics=os.getenv("TOPIC_NAME", "orders").split(","),
50        group_id=os.getenv("GROUP_ID", "orders-group"),
51    )
52    consumer.process()

We can also check messages are consumed after SSH Handshake is succeeded in the container log.

 1...
 2INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9093 <connecting> [IPv4 ('172.24.0.5', 9093)]>: connecting to kafka-2:9093 [('172.24.0.5', 9093) IPv4]
 3INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Loading SSL CA from pem/ca-root.pem
 4INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Loading SSL Cert from pem/client-certificate.pem
 5INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Loading SSL Key from pem/client-private-key.pem
 6INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9093 <handshake> [IPv4 ('172.24.0.5', 9093)]>: Connection complete.
 7INFO:kafka.cluster:Group coordinator for orders-group is BrokerMetadata(nodeId='coordinator-0', host='kafka-0', port=9093, rack=None)
 8INFO:kafka.coordinator:Discovered coordinator coordinator-0 for group orders-group
 9WARNING:kafka.coordinator:Marking the coordinator dead (node coordinator-0) for group orders-group: Node Disconnected.
10INFO:kafka.conn:<BrokerConnection node_id=coordinator-0 host=kafka-0:9093 <connecting> [IPv4 ('172.24.0.3', 9093)]>: connecting to kafka-0:9093 [('172.24.0.3', 9093) IPv4]
11INFO:kafka.cluster:Group coordinator for orders-group is BrokerMetadata(nodeId='coordinator-0', host='kafka-0', port=9093, rack=None)
12INFO:kafka.coordinator:Discovered coordinator coordinator-0 for group orders-group
13INFO:kafka.conn:<BrokerConnection node_id=coordinator-0 host=kafka-0:9093 <handshake> [IPv4 ('172.24.0.3', 9093)]>: Connection complete.
14INFO:kafka.coordinator:(Re-)joining group orders-group
15INFO:kafka.coordinator:Elected group leader -- performing partition assignments using range
16INFO:kafka.coordinator:Successfully joined group orders-group with generation 1
17INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='orders', partition=0)]
18INFO:kafka.coordinator.consumer:Setting newly assigned partitions {TopicPartition(topic='orders', partition=0)} for group orders-group
19...
20INFO:root:key={"order_id": "e2253de4-7c44-4cf1-b45d-7091a0dd1f23"}, value={"order_id": "e2253de4-7c44-4cf1-b45d-7091a0dd1f23", "ordered_at": "2023-06-20T21:38:18.524398", "user_id": "053", "order_items": [{"product_id": 279, "quantity": 1}]}, topic=orders, partition=0, offset=0, ts=1687297098839
21INFO:root:key={"order_id": "f522db30-f2a1-4b43-8233-a2b36b4f3f95"}, value={"order_id": "f522db30-f2a1-4b43-8233-a2b36b4f3f95", "ordered_at": "2023-06-20T21:38:18.524430", "user_id": "038", "order_items": [{"product_id": 456, "quantity": 3}]}, topic=orders, partition=0, offset=1, ts=1687297098840

Kafka-UI

Kafka-UI is also a Java client, and it accepts the Keystore files of the Kafka Truststore (kafka.truststore.jks) and client Keystore (kafka.client.keystore.jks). We can specify the files and passwords to access those as environment variables. The app can be started by docker-compose -f compose-ui.yml up -d.

 1# kafka-dev-with-docker/part-09/compose-ui.yml
 2version: "3.5"
 3
 4services:
 5  kafka-ui:
 6    image: provectuslabs/kafka-ui:master
 7    container_name: kafka-ui
 8    ports:
 9      - "8080:8080"
10    networks:
11      - kafkanet
12    environment:
13      KAFKA_CLUSTERS_0_NAME: local
14      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SSL
15      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9093,kafka-1:9093,kafka-2:9093
16      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
17      KAFKA_CLUSTERS_0_PROPERTIES_SSL_KEYSTORE_LOCATION: /kafka.client.keystore.jks
18      KAFKA_CLUSTERS_0_PROPERTIES_SSL_KEYSTORE_PASSWORD: supersecret
19      KAFKA_CLUSTERS_0_SSL_TRUSTSTORELOCATION: /kafka.truststore.jks
20      KAFKA_CLUSTERS_0_SSL_TRUSTSTOREPASSWORD: supersecret
21    volumes:
22      - ./truststore/kafka.truststore.jks:/kafka.truststore.jks:ro
23      - ./keystore/kafka.client.keystore.jks:/kafka.client.keystore.jks:ro
24
25networks:
26  kafkanet:
27    external: true
28    name: kafka-network

Summary

To improve security, we can extend TLS (SSL or TLS/SSL) encryption either by enforcing two-way verification where a client certificate is verified by Kafka brokers (SSL authentication). Or we can choose a separate authentication mechanism, which is typically Simple Authentication and Security Layer (SASL). In this post, we discussed how to implement SSL authentication with Java and Python client examples while SASL authentication is covered in the next post.