In the previous posts, we discussed how to implement client authentication by TLS (SSL or TLS/SSL) and SASL authentication. One of the key benefits of client authentication is achieving user access control. Kafka ships with a pluggable, out-of-the box authorization framework, which is configured with the authorizer.class.name property in the server configuration and stores Access Control Lists (ACLs) in the cluster metadata (either Zookeeper or the KRaft metadata log). In this post, we will discuss how to configure Kafka authorization with Java and Python client examples while SASL is kept for client authentication.

Certificate Setup

As we will leave Kafka communication to remain encrypted, we need to keep the components for SSL encryption. The details can be found in Part 8, and those components can be generated by generate.sh. Once we execute the script, the following files are created.

 1$ tree certificate-authority keystore truststore pem
 2certificate-authority
 3├── ca-cert
 4└── ca-key
 5keystore
 6├── kafka-0.server.keystore.jks
 7├── kafka-1.server.keystore.jks
 8└── kafka-2.server.keystore.jks
 9truststore
10└── kafka.truststore.jks
11pem
12└── ca-root.pem

Kafka Cluster Update

As discussed in Part 10, authentication should be enabled on the Zookeeper node for SASL authentication. Moreover, it is important to secure it for authorization because ACLs are stored in it. Therefore, I enabled authentication and specified user credentials. The credentials will be referred in the Client context of the Java Authentication and Authorization Service(JAAS) configuration file (kafka_jaas.conf). The details about the configuration file can be found below.

When it comes to Kafka broker configurations, we should add the SASL_SSL listener to the broker configuration and the port 9094 is reserved for it. Both the Keystore and Truststore files are specified in the broker configuration for SSL. The former is to send the broker certificate to clients while the latter is necessary because a Kafka broker can be a client of other brokers. While SASL supports multiple mechanisms, we enabled the Salted Challenge Response Authentication Mechanism (SCRAM) by specifying SCRAM-SHA-256 in the following environment variables.

  • KAFKA_CFG_SASL_ENABLED_MECHANISMS
  • KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL.

For authorization, AclAuthorizer is specified as the authorizer class name, which uses Zookeeper to persist ACLs. A super user named superuser is created. As the name suggests, super users are those who are allowed to execute operations without checking ACLs. Finally, it is configured that anyone is allowed to access resources when no ACL is found (allow.everyone.if.no.acl.found). This is enabled to create the super user after the Kafka cluster gets started. However, it is not recommended in production environemnt.

The changes made to the first Kafka broker are shown below, and the same updates are made to the other brokers. The source can be found in the GitHub repository of this post, and the cluster can be started by docker-compose -f compose-kafka.yml up -d.

 1# kafka-dev-with-docker/part-11/compose-kafka.yml
 2version: "3.5"
 3
 4services:
 5  zookeeper:
 6    image: bitnami/zookeeper:3.5
 7    container_name: zookeeper
 8    ports:
 9      - "2181"
10    networks:
11      - kafkanet
12    environment:
13      - ZOO_ENABLE_AUTH=yes
14      - ZOO_SERVER_USERS=admin
15      - ZOO_SERVER_PASSWORDS=password
16    volumes:
17      - zookeeper_data:/bitnami/zookeeper
18  kafka-0:
19    image: bitnami/kafka:2.8.1
20    container_name: kafka-0
21    expose:
22      - 9092
23      - 9093
24      - 9094
25    ports:
26      - "29092:29092"
27    networks:
28      - kafkanet
29    environment:
30      - ALLOW_PLAINTEXT_LISTENER=yes
31      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
32      - KAFKA_CFG_BROKER_ID=0
33      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL,EXTERNAL:PLAINTEXT
34      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,SSL://:9093,SASL_SSL://:9094,EXTERNAL://:29092
35      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,SSL://kafka-0:9093,SASL_SSL://kafka-0:9094,EXTERNAL://localhost:29092
36      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SSL
37      - KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
38      - KAFKA_CFG_SSL_KEYSTORE_PASSWORD=supersecret
39      - KAFKA_CFG_SSL_KEY_PASSWORD=supersecret
40      - KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
41      - KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=supersecret
42      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256
43      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-256
44      - KAFKA_CFG_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer
45      - KAFKA_CFG_SUPER_USERS=User:superuser
46      - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND=true
47    volumes:
48      - kafka_0_data:/bitnami/kafka
49      - ./keystore/kafka-0.server.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
50      - ./truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
51      - ./kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
52      - ./client.properties:/opt/bitnami/kafka/config/client.properties:ro
53      - ./command.properties:/opt/bitnami/kafka/config/command.properties:ro
54      - ./superuser.properties:/opt/bitnami/kafka/config/superuser.properties:ro
55    depends_on:
56      - zookeeper
57
58...
59
60networks:
61  kafkanet:
62    name: kafka-network
63
64...

As mentioned earlier, the broker needs a JAAS configuration file, and it should include 2 contexts - KafkaServer and Client. The former is required for inter-broker communication while the latter is for accessing the Zookeeper node. As SASL is not enabled for inter-broker communication, dummy credentials are added for the KafkaServer context while the Zookeeper user credentials are kept in the Client context. The credentials are those that are specified by the following environment variables in the Zookeeper node - ZOO_SERVER_USERS and ZOO_SERVER_PASSWORDS.

 1# kafka-dev-with-docker/part-11/kafka_jaas.conf
 2KafkaServer {
 3  org.apache.kafka.common.security.scram.ScramLoginModule required
 4  username="_"
 5  password="_";
 6};
 7
 8Client {
 9  org.apache.kafka.common.security.plain.PlainLoginModule required
10  username="admin"
11  password="password";
12};

Examples

For SSL encryption, Java and non-Java clients need different configurations. The former can use the Keystore file of the Truststore directly while the latter needs corresponding details in a PEM file. The Kafka CLI and Kafka-UI will be taken as Java client examples while Python producer/consumer will be used to illustrate non-Java clients.

For client authentication, we will create a total of 4 SCRAM users. At first we will create the super user. Then the super user will create the 3 client users as well as their permissions.

User Creation

The SCRAM super user can be created by using either the PLAINTEXT or SSL listener within a broker container. Here we will use the SSL listener with the following configuration.

1# kafka-dev-with-docker/part-11/command.properties
2security.protocol=SSL
3ssl.truststore.location=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
4ssl.truststore.password=supersecret

Once the super user is created, the client users will be created via the SASL_SSL listener using the following properties.

1# kafka-dev-with-docker/part-11/superuser.properties
2sasl.mechanism=SCRAM-SHA-256
3sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="superuser" password="password";
4security.protocol=SASL_SSL
5ssl.truststore.location=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
6ssl.truststore.password=supersecret

Below shows details of creating users. There is no user by default, and the SCRAM super user as well as the 3 client users are created. The client users are named client, producer and consumer.

 1$ docker exec -it kafka-0 bash
 2I have no name!@b28e71a2ae2c:/$ cd /opt/bitnami/kafka/bin/
 3## describe (list) all users (via SSH) - no user exists
 4I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-configs.sh --bootstrap-server kafka-1:9093 --describe \
 5  --entity-type users --command-config /opt/bitnami/kafka/config/command.properties
 6
 7## create superuser via (via SSH)
 8I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-configs.sh --bootstrap-server kafka-1:9093 --alter \
 9  --add-config 'SCRAM-SHA-256=[iterations=8192,password=password]' \
10  --entity-type users --entity-name superuser \
11  --command-config /opt/bitnami/kafka/config/command.properties
12# Completed updating config for user superuser.
13
14## create users for Kafka client (via SASL_SSL as superuser)
15I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ for USER in "client" "producer" "consumer"; do
16  ./kafka-configs.sh --bootstrap-server kafka-1:9094 --alter \
17    --add-config 'SCRAM-SHA-256=[iterations=8192,password=password]' \
18    --entity-type users --entity-name $USER \
19    --command-config /opt/bitnami/kafka/config/superuser.properties
20done
21# Completed updating config for user client.
22# Completed updating config for user producer.
23# Completed updating config for user consumer.
24
25## check if all users exist (via SASL_SSL as superuser)
26I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-configs.sh --bootstrap-server kafka-1:9094 --describe \
27  --entity-type users --command-config /opt/bitnami/kafka/config/superuser.properties
28# SCRAM credential configs for user-principal 'client' are SCRAM-SHA-256=iterations=8192
29# SCRAM credential configs for user-principal 'consumer' are SCRAM-SHA-256=iterations=8192
30# SCRAM credential configs for user-principal 'producer' are SCRAM-SHA-256=iterations=8192
31# SCRAM credential configs for user-principal 'superuser' are SCRAM-SHA-256=iterations=8192

ACL Creation

The user named client is authorized to perform all operations on a topic named inventory. This user will be used to demonstrate how to produce and consume messages using Kafka CLI.

 1## create ACL for inventory topic. The user 'client' has permission on all operations
 2I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-acls.sh --bootstrap-server kafka-1:9094 --add \
 3  --allow-principal User:client --operation All --group '*' \
 4  --topic inventory --command-config /opt/bitnami/kafka/config/superuser.properties
 5# Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=inventory, patternType=LITERAL)`: 
 6#         (principal=User:client, host=*, operation=ALL, permissionType=ALLOW) 
 7
 8# Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`: 
 9#         (principal=User:client, host=*, operation=ALL, permissionType=ALLOW) 
10
11# Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`: 
12#         (principal=User:client, host=*, operation=ALL, permissionType=ALLOW) 
13
14# Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=inventory, patternType=LITERAL)`: 
15#         (principal=User:client, host=*, operation=ALL, permissionType=ALLOW) 
16
17I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-acls.sh --bootstrap-server kafka-1:9094 --list \
18  --topic inventory --command-config /opt/bitnami/kafka/config/superuser.properties
19# Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=inventory, patternType=LITERAL)`: 
20#         (principal=User:client, host=*, operation=ALL, permissionType=ALLOW) 

The Kafka CLI supports to create canned ACLs that are specific to a producer or consumer. As we have separate Python producer and consumer apps, separate ACLs are created according to their roles.

 1I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-acls.sh --bootstrap-server kafka-1:9094 --add \
 2  --allow-principal User:producer --producer \
 3  --topic orders --command-config /opt/bitnami/kafka/config/superuser.properties
 4# Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=orders, patternType=LITERAL)`: 
 5#         (principal=User:producer, host=*, operation=WRITE, permissionType=ALLOW)
 6#         (principal=User:producer, host=*, operation=DESCRIBE, permissionType=ALLOW)
 7#         (principal=User:producer, host=*, operation=CREATE, permissionType=ALLOW) 
 8
 9# Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=orders, patternType=LITERAL)`: 
10#         (principal=User:producer, host=*, operation=WRITE, permissionType=ALLOW)
11#         (principal=User:producer, host=*, operation=CREATE, permissionType=ALLOW)
12#         (principal=User:producer, host=*, operation=DESCRIBE, permissionType=ALLOW) 
13
14I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-acls.sh --bootstrap-server kafka-1:9094 --add \
15  --allow-principal User:consumer --consumer --group '*' \
16  --topic orders --command-config /opt/bitnami/kafka/config/superuser.properties
17# Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=orders, patternType=LITERAL)`: 
18#         (principal=User:consumer, host=*, operation=READ, permissionType=ALLOW)
19#         (principal=User:consumer, host=*, operation=DESCRIBE, permissionType=ALLOW) 
20
21# Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`: 
22#         (principal=User:consumer, host=*, operation=READ, permissionType=ALLOW) 
23
24# Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`: 
25#         (principal=User:consumer, host=*, operation=READ, permissionType=ALLOW)
26#         (principal=User:client, host=*, operation=ALL, permissionType=ALLOW) 
27
28# Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=orders, patternType=LITERAL)`: 
29#         (principal=User:producer, host=*, operation=CREATE, permissionType=ALLOW)
30#         (principal=User:producer, host=*, operation=DESCRIBE, permissionType=ALLOW)
31#         (principal=User:consumer, host=*, operation=DESCRIBE, permissionType=ALLOW)
32#         (principal=User:producer, host=*, operation=WRITE, permissionType=ALLOW)
33#         (principal=User:consumer, host=*, operation=READ, permissionType=ALLOW) 
34
35I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-acls.sh --bootstrap-server kafka-1:9094 --list \
36  --topic orders --command-config /opt/bitnami/kafka/config/superuser.properties
37# Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=orders, patternType=LITERAL)`: 
38#         (principal=User:producer, host=*, operation=CREATE, permissionType=ALLOW)
39#         (principal=User:producer, host=*, operation=DESCRIBE, permissionType=ALLOW)
40#         (principal=User:consumer, host=*, operation=DESCRIBE, permissionType=ALLOW)
41#         (principal=User:producer, host=*, operation=WRITE, permissionType=ALLOW)
42#         (principal=User:consumer, host=*, operation=READ, permissionType=ALLOW) 

Kafka CLI

The following configuration is necessary to use the SASL_SSL listener. Firstly the security protocol is set to be SASL_SSL. Next the location of the Truststore file and the password to access it are specified for SSL encryption. Finally, the SASL mechanism and corresponding JAAS configuration are added for client authentication.

1# kafka-dev-with-docker/part-11/client.properties
2sasl.mechanism=SCRAM-SHA-256
3sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="client" password="password";
4security.protocol=SASL_SSL
5ssl.truststore.location=/opt/bitnami/kafka/config/certs/kafka.truststore.jks
6ssl.truststore.password=supersecret

Below shows a producer example. It produces messages to a topic named inventory successfully via the SASL_SSL listener. Note the client configuration file (client.properties) is specified in the producer configuration, and it is available via volume-mapping.

1## producer
2$ docker exec -it kafka-0 bash
3I have no name!@b28e71a2ae2c:/$ cd /opt/bitnami/kafka/bin/
4I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-console-producer.sh --bootstrap-server kafka-0:9093 \
5  --topic inventory --producer.config /opt/bitnami/kafka/config/client.properties
6>product: apples, quantity: 5
7>product: lemons, quantity: 7

Once messages are created, we can check it by a consumer. We can execute a consumer in a separate console.

1## consumer
2$ docker exec -it kafka-0 bash
3I have no name!@b28e71a2ae2c:/$ cd /opt/bitnami/kafka/bin/
4I have no name!@b28e71a2ae2c:/opt/bitnami/kafka/bin$ ./kafka-console-consumer.sh --bootstrap-server kafka-0:9093 \
5  --topic inventory --consumer.config /opt/bitnami/kafka/config/client.properties --from-beginning
6# [2023-06-21 01:30:01,890] WARN [Consumer clientId=consumer-console-consumer-94700-1, groupId=console-consumer-94700] Error while fetching metadata with correlation id 2 : {inventory=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
7product: apples, quantity: 5
8product: lemons, quantity: 7

Python Client

We will run the Python producer and consumer apps using docker-compose. At startup, each of them installs required packages and executes its corresponding app script. As it shares the same network to the Kafka cluster, we can take the service names (e.g. kafka-0) on port 9094 as Kafka bootstrap servers. As shown below, we will need the certificate of the CA (ca-root.pem) and it will be available via volume-mapping. Also, the relevant SCRAM user credentials are added to environment variables. The apps can be started by docker-compose -f compose-apps.yml up -d.

 1# kafka-dev-with-docker/part-11/compose-apps.yml
 2version: "3.5"
 3
 4services:
 5  producer:
 6    image: bitnami/python:3.9
 7    container_name: producer
 8    command: "sh -c 'pip install -r requirements.txt && python producer.py'"
 9    networks:
10      - kafkanet
11    environment:
12      BOOTSTRAP_SERVERS: kafka-0:9094,kafka-1:9094,kafka-2:9094
13      TOPIC_NAME: orders
14      TZ: Australia/Sydney
15      SASL_USERNAME: producer
16      SASL_PASSWORD: password
17    volumes:
18      - .:/app
19  consumer:
20    image: bitnami/python:3.9
21    container_name: consumer
22    command: "sh -c 'pip install -r requirements.txt && python consumer.py'"
23    networks:
24      - kafkanet
25    environment:
26      BOOTSTRAP_SERVERS: kafka-0:9094,kafka-1:9094,kafka-2:9094
27      TOPIC_NAME: orders
28      GROUP_ID: orders-group
29      TZ: Australia/Sydney
30      SASL_USERNAME: consumer
31      SASL_PASSWORD: password
32    volumes:
33      - .:/app
34
35networks:
36  kafkanet:
37    external: true
38    name: kafka-network

Producer

The same producer app discussed in Part 4 is used here. The following arguments are added to access the SASL_SSL listener.

  • security_protocol - Protocol used to communicate with brokers.
  • ssl_check_hostname - Flag to configure whether SSL handshake should verify that the certificate matches the broker’s hostname.
  • ssl_cafile - Optional filename of CA (certificate) file to use in certificate verification.
  • sasl_mechanism - Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL.
  • sasl_plain_username - Username for SASL PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
  • sasl_plain_password - Password for SASL PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
 1# kafka-dev-with-docker/part-11/producer.py
 2...
 3
 4class Producer:
 5    def __init__(self, bootstrap_servers: list, topic: str):
 6        self.bootstrap_servers = bootstrap_servers
 7        self.topic = topic
 8        self.producer = self.create()
 9
10    def create(self):
11        return KafkaProducer(
12            bootstrap_servers=self.bootstrap_servers,
13            security_protocol="SASL_SSL",
14            ssl_check_hostname=True,
15            ssl_cafile="pem/ca-root.pem",
16            sasl_mechanism="SCRAM-SHA-256",
17            sasl_plain_username=os.environ["SASL_USERNAME"],
18            sasl_plain_password=os.environ["SASL_PASSWORD"],
19            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
20            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
21        )
22
23    def send(self, orders: typing.List[Order]):
24        for order in orders:
25            try:
26                self.producer.send(
27                    self.topic, key={"order_id": order.order_id}, value=order.asdict()
28                )
29            except Exception as e:
30                raise RuntimeError("fails to send a message") from e
31        self.producer.flush()
32
33...
34
35if __name__ == "__main__":
36    producer = Producer(
37        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
38        topic=os.getenv("TOPIC_NAME", "orders"),
39    )
40    max_run = int(os.getenv("MAX_RUN", "-1"))
41    logging.info(f"max run - {max_run}")
42    current_run = 0
43    while True:
44        current_run += 1
45        logging.info(f"current run - {current_run}")
46        if current_run > max_run and max_run >= 0:
47            logging.info(f"exceeds max run, finish")
48            producer.producer.close()
49            break
50        producer.send(Order.auto().create(100))
51        time.sleep(1)

In the container log, we can check SSH Handshake and client authentication are performed successfully.

1INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-1:9094 <handshake> [IPv4 ('192.168.0.3', 9094)]>: Loading SSL CA from pem/ca-root.pem
2INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-1:9094 <authenticating> [IPv4 ('192.168.0.3', 9094)]>: Authenticated as producer via SCRAM-SHA-256
3INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka-1:9094 <authenticating> [IPv4 ('192.168.0.3', 9094)]>: Connection complete.
4INFO:root:max run - -1
5INFO:root:current run - 1
6...
7INFO:root:current run - 2

Consumer

The same consumer app in Part 4 is used here as well. As the producer app, the following arguments are added - security_protocol, ssl_check_hostname, ssl_cafile, sasl_mechanism, sasl_plain_username and sasl_plain_password.

 1# kafka-dev-with-docker/part-11/consumer.py
 2...
 3
 4class Consumer:
 5    def __init__(self, bootstrap_servers: list, topics: list, group_id: str) -> None:
 6        self.bootstrap_servers = bootstrap_servers
 7        self.topics = topics
 8        self.group_id = group_id
 9        self.consumer = self.create()
10
11    def create(self):
12        return KafkaConsumer(
13            *self.topics,
14            bootstrap_servers=self.bootstrap_servers,
15            security_protocol="SASL_SSL",
16            ssl_check_hostname=True,
17            ssl_cafile="pem/ca-root.pem",
18            sasl_mechanism="SCRAM-SHA-256",
19            sasl_plain_username=os.environ["SASL_USERNAME"],
20            sasl_plain_password=os.environ["SASL_PASSWORD"],
21            auto_offset_reset="earliest",
22            enable_auto_commit=True,
23            group_id=self.group_id,
24            key_deserializer=lambda v: v.decode("utf-8"),
25            value_deserializer=lambda v: v.decode("utf-8"),
26        )
27
28    def process(self):
29        try:
30            while True:
31                msg = self.consumer.poll(timeout_ms=1000)
32                if msg is None:
33                    continue
34                self.print_info(msg)
35                time.sleep(1)
36        except KafkaError as error:
37            logging.error(error)
38
39    def print_info(self, msg: dict):
40        for t, v in msg.items():
41            for r in v:
42                logging.info(
43                    f"key={r.key}, value={r.value}, topic={t.topic}, partition={t.partition}, offset={r.offset}, ts={r.timestamp}"
44                )
45
46
47if __name__ == "__main__":
48    consumer = Consumer(
49        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
50        topics=os.getenv("TOPIC_NAME", "orders").split(","),
51        group_id=os.getenv("GROUP_ID", "orders-group"),
52    )
53    consumer.process()

We can also check messages are consumed after SSH Handshake and client authentication are succeeded in the container log.

 1INFO:kafka.coordinator:Elected group leader -- performing partition assignments using range
 2INFO:kafka.coordinator:Successfully joined group orders-group with generation 1
 3INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='orders', partition=0)]
 4INFO:kafka.coordinator.consumer:Setting newly assigned partitions {TopicPartition(topic='orders', partition=0)} for group orders-group
 5...
 6INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9094 <connecting> [IPv4 ('192.168.0.5', 9094)]>: connecting to kafka-2:9094 [('192.168.0.5', 9094) IPv4]
 7INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9094 <handshake> [IPv4 ('192.168.0.5', 9094)]>: Loading SSL CA from pem/ca-root.pem
 8INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9094 <authenticating> [IPv4 ('192.168.0.5', 9094)]>: Authenticated as consumer via SCRAM-SHA-256
 9INFO:kafka.conn:<BrokerConnection node_id=2 host=kafka-2:9094 <authenticating> [IPv4 ('192.168.0.5', 9094)]>: Connection complete.
10INFO:root:key={"order_id": "7de9132b-c71e-4739-a2f8-7b6aed7ce8c9"}, value={"order_id": "7de9132b-c71e-4739-a2f8-7b6aed7ce8c9", "ordered_at": "2023-06-21T03:13:19.363325", "user_id": "017", "order_items": [{"product_id": 553, "quantity": 8}]}, topic=orders, partition=0, offset=0, ts=1687317199370
11INFO:root:key={"order_id": "f222065e-489c-4ecd-b864-88163e800c79"}, value={"order_id": "f222065e-489c-4ecd-b864-88163e800c79", "ordered_at": "2023-06-21T03:13:19.363402", "user_id": "023", "order_items": [{"product_id": 417, "quantity": 10}, {"product_id": 554, "quantity": 1}, {"product_id": 942, "quantity": 6}]}, topic=orders, partition=0, offset=1, ts=1687317199371

Kafka-UI

Kafka-UI is also a Java client, and it accepts the Keystore file of the Kafka Truststore (kafka.truststore.jks). We can specify the file and password to access it as environment variables for SSL encryption. For client authentication, we need to add the SASL mechanism and corresponding JAAS configuration to environment variables. Note that the super user credentials are added to the configuration but it is not recommended in production environment. The app can be started by docker-compose -f compose-ui.yml up -d.

 1# kafka-dev-with-docker/part-11/compose-ui.yml
 2version: "3.5"
 3
 4services:
 5  kafka-ui:
 6    image: provectuslabs/kafka-ui:master
 7    container_name: kafka-ui
 8    ports:
 9      - "8080:8080"
10    networks:
11      - kafkanet
12    environment:
13      KAFKA_CLUSTERS_0_NAME: local
14      KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL: SASL_SSL
15      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9094,kafka-1:9094,kafka-2:9094
16      KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM: SCRAM-SHA-256
17      KAFKA_CLUSTERS_0_PROPERTIES_PROTOCOL: SASL
18      KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG: org.apache.kafka.common.security.scram.ScramLoginModule required username="superuser" password="password";
19      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
20      KAFKA_CLUSTERS_0_SSL_TRUSTSTORELOCATION: /kafka.truststore.jks
21      KAFKA_CLUSTERS_0_SSL_TRUSTSTOREPASSWORD: supersecret
22    volumes:
23      - ./truststore/kafka.truststore.jks:/kafka.truststore.jks:ro
24
25networks:
26  kafkanet:
27    external: true
28    name: kafka-network

Summary

In the previous posts, we discussed how to implement client authentication by TLS (SSL or TLS/SSL) and SASL authentication. One of the key benefits of client authentication is achieving user access control. In this post, we discussed how to configure Kafka authorization with Java and Python client examples while SASL is kept for client authentication.