Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. With the operator, we can simplify deployment and management of Python stream processing applications. In this series, we discuss how to deploy a PyFlink application and Python Apache Beam pipeline on the Flink Runner on Kubernetes.

In Part 1, we first deploy a Kafka cluster on a minikube cluster as the source and sink of the PyFlink application are Kafka topics. Then, the application source is packaged in a custom Docker image and deployed on the minikube cluster using the Flink Kubernetes Operator. Finally, the output of the application is checked by sending messages to the input Kafka topic using a Python producer application.

Setup Kafka Cluster

As the source and sink of the stream processing application are Kafka topics, a Kafka cluster is deployed using the Strimzi Operator on a minikube cluster. We install Strimzi version 0.39.0 and Kubernetes version 1.25.3. Once the minikube CLI and Docker are installed, a minikube cluster can be created by specifying the desired Kubernetes version.

1minikube start --cpus='max' --memory=20480 --addons=metrics-server --kubernetes-version=v1.25.3

Deploy Strimzi Operator

The project repository keeps manifest files that can be used to deploy the Strimzi Operator, Kafka cluster and Kafka management app. If you want to download a different version of the operator, you can download the relevant manifest file by specifying the desired version. By default, the manifest file assumes that the resources are deployed in the myproject namespace. As we deploy them in the default namespace, however, we need to change the resource namespace using sed. The operator can be deployed using kubectl create.

 1## download and deploy strimzi oeprator
 4## (optional) if downloading a different version
 6curl -L -o kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml ${DOWNLOAD_URL}
 7# update namespace from myproject to default
 8sed -i 's/namespace: .*/namespace: default/' kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
10## deploy strimzi cluster operator
11kubectl create -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml

We can check the Strimzi Operator runs as a Deployment.

1kubectl get deploy,rs,po
2# NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
3# deployment.apps/strimzi-cluster-operator   1/1     1            1           2m50s
5# NAME                                                 DESIRED   CURRENT   READY   AGE
6# replicaset.apps/strimzi-cluster-operator-8d6d4795c   1         1         1       2m50s
8# NAME                                           READY   STATUS    RESTARTS   AGE
9# pod/strimzi-cluster-operator-8d6d4795c-94t8c   1/1     Running   0          2m49s

Deploy Kafka Cluster

We deploy a Kafka cluster with a single broker and Zookeeper node. It has both internal and external listeners on port 9092 and 29092 respectively. Note that the external listener will be used to access the Kafka cluster outside the minikube cluster. Also, the cluster is configured to allow automatic creation of topics (auto.create.topics.enable: “true”) and the default number of partitions is set to 3 (num.partitions: 3).

 1# kafka/manifests/kafka-cluster.yaml
 2apiVersion: kafka.strimzi.io/v1beta2
 3kind: Kafka
 5  name: demo-cluster
 7  kafka:
 8    version: 3.5.2
 9    replicas: 1
10    resources:
11      requests:
12        memory: 256Mi
13        cpu: 250m
14      limits:
15        memory: 512Mi
16        cpu: 500m
17    listeners:
18      - name: plain
19        port: 9092
20        type: internal
21        tls: false
22      - name: external
23        port: 29092
24        type: nodeport
25        tls: false
26    storage:
27      type: jbod
28      volumes:
29        - id: 0
30          type: persistent-claim
31          size: 20Gi
32          deleteClaim: true
33    config:
34      offsets.topic.replication.factor: 1
35      transaction.state.log.replication.factor: 1
36      transaction.state.log.min.isr: 1
37      default.replication.factor: 1
38      min.insync.replicas: 1
39      inter.broker.protocol.version: "3.5"
40      auto.create.topics.enable: "true"
41      num.partitions: 3
42  zookeeper:
43    replicas: 1
44    resources:
45      requests:
46        memory: 256Mi
47        cpu: 250m
48      limits:
49        memory: 512Mi
50        cpu: 500m
51    storage:
52      type: persistent-claim
53      size: 10Gi
54      deleteClaim: true

The Kafka cluster can be deployed using kubectl create.

1kubectl create -f kafka/manifests/kafka-cluster.yaml

The Kafka and Zookeeper nodes are managed by the StrimziPodSet custom resource. It also creates multiple services, and we use the following services in this series.

  • communication within the Kubernetes cluster
    • demo-cluster-kafka-bootstrap - to access Kafka brokers from the client and management apps
    • demo-cluster-zookeeper-client - to access Zookeeper node from the management app
  • communication from the host
    • demo-cluster-kafka-external-bootstrap - to access Kafka brokers from the producer app
 1kubectl get po,strimzipodsets.core.strimzi.io,svc -l app.kubernetes.io/instance=demo-cluster
 2# NAME                           READY   STATUS    RESTARTS   AGE
 3# pod/demo-cluster-kafka-0       1/1     Running   0          115s
 4# pod/demo-cluster-zookeeper-0   1/1     Running   0          2m20s
 6# NAME                                                   PODS   READY PODS   CURRENT PODS   AGE
 7# strimzipodset.core.strimzi.io/demo-cluster-kafka       1      1            1              115s
 8# strimzipodset.core.strimzi.io/demo-cluster-zookeeper   1      1            1              2m20s
10# NAME                                            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
11# service/demo-cluster-kafka-bootstrap            ClusterIP    <none>        9091/TCP,9092/TCP                     115s
12# service/demo-cluster-kafka-brokers              ClusterIP   None             <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP   115s
13# service/demo-cluster-kafka-external-0           NodePort    <none>        29092:32475/TCP                       115s
14# service/demo-cluster-kafka-external-bootstrap   NodePort   <none>        29092:32674/TCP                       115s
15# service/demo-cluster-zookeeper-client           ClusterIP    <none>        2181/TCP                              2m20s
16# service/demo-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            2m20s

Deploy Kafka UI

UI for Apache Kafka (kafka-ui) is a free and open-source Kafka management application, and it is deployed as a Kubernetes Deployment. The Deployment is configured to have a single instance, and the Kafka cluster access details are specified as environment variables.

 1# kafka/manifests/kafka-ui.yaml
 2apiVersion: v1
 3kind: Service
 5  labels:
 6    app: kafka-ui
 7  name: kafka-ui
 9  type: ClusterIP
10  ports:
11    - port: 8080
12      targetPort: 8080
13  selector:
14    app: kafka-ui
16apiVersion: apps/v1
17kind: Deployment
19  labels:
20    app: kafka-ui
21  name: kafka-ui
23  replicas: 1
24  selector:
25    matchLabels:
26      app: kafka-ui
27  template:
28    metadata:
29      labels:
30        app: kafka-ui
31    spec:
32      containers:
33        - image: provectuslabs/kafka-ui:v0.7.1
34          name: kafka-ui-container
35          ports:
36            - containerPort: 8080
37          env:
38            - name: KAFKA_CLUSTERS_0_NAME
39              value: demo-cluster
41              value: demo-cluster-kafka-bootstrap:9092
42            - name: KAFKA_CLUSTERS_0_ZOOKEEPER
43              value: demo-cluster-zookeeper-client:2181
44          resources:
45            requests:
46              memory: 256Mi
47              cpu: 250m
48            limits:
49              memory: 512Mi
50              cpu: 500m

The Kafka management app (kafka-ui) can be deployed using kubectl create.

 1kubectl create -f kafka/manifests/kafka-ui.yaml
 3kubectl get all -l app=kafka-ui
 4# NAME                            READY   STATUS    RESTARTS   AGE
 5# pod/kafka-ui-65dbbc98dc-zl5gv   1/1     Running   0          35s
 7# NAME               TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)    AGE
 8# service/kafka-ui   ClusterIP   <none>        8080/TCP   36s
10# NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
11# deployment.apps/kafka-ui   1/1     1            1           35s
13# NAME                                  DESIRED   CURRENT   READY   AGE
14# replicaset.apps/kafka-ui-65dbbc98dc   1         1         1       35s

We can use kubectl port-forward to connect to the kafka-ui server running in the minikube cluster on port 8080.

1kubectl port-forward svc/kafka-ui 8080

Develop Stream Processing App

A stream processing application is developed using PyFlink, and it is packaged in a custom Docker image for deployment.

The application begins with reading text messages from a Kafka topic named input-topic, followed by extracting words by splitting the messages. Next, as we are going to calculate the average lengths of all words, all of them are added to a Tumbling window of 5 seconds - we use a processing time window for simplicity. After that, it calculates the average length of words within a window. Note that, as we are going to include the window start and end timestamps, the ProcessAllWindowFunction is used instead of the AggregateFunction. Finally, the output reocrds are sent to a Kafka topic named output-topic-flink.

  1# flink/word_len.py
  2import os
  3import re
  4import json
  5import datetime
  6import logging
  7import typing
  9from pyflink.common import WatermarkStrategy
 10from pyflink.datastream import (
 11    DataStream,
 12    StreamExecutionEnvironment,
 13    RuntimeExecutionMode,
 15from pyflink.common.typeinfo import Types
 16from pyflink.datastream.functions import ProcessAllWindowFunction
 17from pyflink.datastream.window import TumblingProcessingTimeWindows, Time, TimeWindow
 18from pyflink.datastream.connectors.kafka import (
 19    KafkaSource,
 20    KafkaOffsetsInitializer,
 21    KafkaSink,
 22    KafkaRecordSerializationSchema,
 23    DeliveryGuarantee,
 25from pyflink.common.serialization import SimpleStringSchema
 28def tokenize(element: str):
 29    for word in re.findall(r"[A-Za-z\']+", element):
 30        yield word
 33def create_message(element: typing.Tuple[str, str, float]):
 34    return json.dumps(dict(zip(["window_start", "window_end", "avg_len"], element)))
 37class AverageWindowFunction(ProcessAllWindowFunction):
 38    def process(
 39        self, context: ProcessAllWindowFunction.Context, elements: typing.Iterable[str]
 40    ) -> typing.Iterable[typing.Tuple[str, str, float]]:
 41        window: TimeWindow = context.window()
 42        window_start = datetime.datetime.fromtimestamp(window.start // 1000).isoformat(
 43            timespec="seconds"
 44        )
 45        window_end = datetime.datetime.fromtimestamp(window.end // 1000).isoformat(
 46            timespec="seconds"
 47        )
 48        length, count = 0, 0
 49        for e in elements:
 50            length += len(e)
 51            count += 1
 52        result = window_start, window_end, length / count if count else float("NaN")
 53        logging.info(f"AverageWindowFunction: result - {result}")
 54        yield result
 57def define_workflow(source_system: DataStream):
 58    return (
 59        source_system.flat_map(tokenize)
 60        .window_all(TumblingProcessingTimeWindows.of(Time.seconds(5)))
 61        .process(AverageWindowFunction())
 62    )
 65if __name__ == "__main__":
 66    env = StreamExecutionEnvironment.get_execution_environment()
 67    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
 68    env.enable_checkpointing(5000)
 69    env.set_parallelism(3)
 71    input_source = (
 72        KafkaSource.builder()
 73        .set_bootstrap_servers(os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"))
 74        .set_topics(os.getenv("INPUT_TOPIC", "input-topic"))
 75        .set_group_id(os.getenv("GROUP_ID", "flink-word-len"))
 76        .set_starting_offsets(KafkaOffsetsInitializer.latest())
 77        .set_value_only_deserializer(SimpleStringSchema())
 78        .build()
 79    )
 81    input_stream = env.from_source(
 82        input_source, WatermarkStrategy.no_watermarks(), "input_source"
 83    )
 85    output_sink = (
 86        KafkaSink.builder()
 87        .set_bootstrap_servers(os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"))
 88        .set_record_serializer(
 89            KafkaRecordSerializationSchema.builder()
 90            .set_topic(os.getenv("OUTPUT_TOPIC", "output-topic-flink"))
 91            .set_value_serialization_schema(SimpleStringSchema())
 92            .build()
 93        )
 94        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
 95        .build()
 96    )
 98    define_workflow(input_stream).map(
 99        create_message, output_type=Types.STRING()
100    ).sink_to(output_sink).name("output_sink")
102    env.execute("avg-word-length-flink")

Build Docker Image

A custom Docker image named flink-python-example:1.17 is created for deployment. Based on the official flink:1.17 image, it downloads dependnent Jar files, installs Python and the apache-flink package, and adds the application source.

 1# flink/Dockerfile
 2FROM flink:1.17
 9## download connector libs
10RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar \
11  && wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/$FLINK_VERSION/flink-sql-connector-kafka-$FLINK_VERSION.jar
13## install python
14RUN apt-get update -y && \
15  apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev liblzma-dev && \
16  wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz && \
17  tar -xvf Python-${PYTHON_VERSION}.tgz && \
18  cd Python-${PYTHON_VERSION} && \
19  ./configure --without-tests --enable-shared && \
20  make -j6 && \
21  make install && \
22  ldconfig /usr/local/lib && \
23  cd .. && rm -f Python-${PYTHON_VERSION}.tgz && rm -rf Python-${PYTHON_VERSION} && \
24  ln -s /usr/local/bin/python3 /usr/local/bin/python && \
25  apt-get clean && \
26  rm -rf /var/lib/apt/lists/*
28## install pip packages
29RUN pip3 install apache-flink==${FLINK_VERSION}
31## add python script
32USER flink
33RUN mkdir /opt/flink/usrlib
34ADD word_len.py /opt/flink/usrlib/word_len.py

As the custom image should be accessible in the minikube cluster, we should point the terminal’s docker-cli to the minikube’s Docker engine. Then, the image can be built as usual using docker build.

1# point the docker-cli to the minikube's Docker engine
2eval $(minikube docker-env)
3# build image
4docker build -t flink-python-example:1.17 flink/

Deploy Stream Processing App

The Pyflink application is deployed as a single job of a Flink cluster using the Flink Kubernetes Operator. Then, we check the output of the application by sending text messages to the input Kafka topic.

We first need to install the certificate manager on the minikube cluster to enable adding the webhook component. Then, the operator can be installed using a Helm chart, and the version 1.8.0 is installed in the post.

 1kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
 2helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
 3helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
 4# NAME: flink-kubernetes-operator
 5# LAST DEPLOYED: Wed May 29 05:05:08 2024
 6# NAMESPACE: default
 7# STATUS: deployed
 9# TEST SUITE: None
11helm list
12# NAME                            NAMESPACE       REVISION        UPDATED                                         STATUS          CHART                           APP VERSION
13# flink-kubernetes-operator       default         1               2024-05-29 05:05:08.241071054 +1000 AEST        deployed        flink-kubernetes-operator-1.8.0 1.8.0

The PyFlink app is deployed as a single job of a Flink cluster using the FlinkDeployment custom resource. In the manifest file, we configure common properties such as the Docker image, Flink version, cluster configuration and pod template. These properties are applied to the Flink job manager and task manager, and only the replica and resource are specified additionally to them. Finally, the PyFlink app is added as a job where the main PythonDriver entry class requires the paths of the Python executable and application source script. See this page for details about the FlinkDeployment resource.

 1# flink/word_len.yml
 2apiVersion: flink.apache.org/v1beta1
 3kind: FlinkDeployment
 5  name: flink-word-len
 7  image: flink-python-example:1.17
 8  imagePullPolicy: Never
 9  flinkVersion: v1_17
10  flinkConfiguration:
11    taskmanager.numberOfTaskSlots: "5"
12  serviceAccount: flink
13  podTemplate:
14    spec:
15      containers:
16        - name: flink-main-container
17          env:
18            - name: BOOTSTRAP_SERVERS
19              value: demo-cluster-kafka-bootstrap:9092
20            - name: INPUT_TOPIC
21              value: input-topic
22            - name: GROUP_ID
23              value: flink-word-len
24            - name: OUTPUT_TOPIC
25              value: output-topic-flink
26          volumeMounts:
27            - mountPath: /opt/flink/log
28              name: flink-logs
29            - mountPath: /tmp/flink-artifact-staging
30              name: flink-staging
31      volumes:
32        - name: flink-logs
33          emptyDir: {}
34        - name: flink-staging
35          emptyDir: {}
36  jobManager:
37    resource:
38      memory: "2048m"
39      cpu: 1
40  taskManager:
41    replicas: 2
42    resource:
43      memory: "2048m"
44      cpu: 1
45  job:
46    jarURI: local:///opt/flink/opt/flink-python-1.17.2.jar
47    entryClass: "org.apache.flink.client.python.PythonDriver"
48    args:
49      [
50        "-pyclientexec",
51        "/usr/local/bin/python3",
52        "-py",
53        "/opt/flink/usrlib/word_len.py",
54      ]
55    parallelism: 3
56    upgradeMode: stateless

Before we deploy the PyFlink app, make sure the input topic is created. We can create it using kafka-ui easily.

The app can be deployed using kubectl create, and it creates the FlinkDeployment custom resource, which manages the job manager deployment, task manager pod and associated services.

 1kubectl create -f flink/word_len.yml
 2# flinkdeployment.flink.apache.org/flink-word-len created
 4kubectl get flinkdeployments.flink.apache.org
 6# flink-word-len   RUNNING      STABLE
 8kubectl get all -l app=flink-word-len
 9# NAME                                  READY   STATUS    RESTARTS   AGE
10# pod/flink-word-len-854cf856d8-w8cjf   1/1     Running   0          78s
11# pod/flink-word-len-taskmanager-1-1    1/1     Running   0          66s
13# NAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
14# service/flink-word-len        ClusterIP   None            <none>        6123/TCP,6124/TCP   78s
15# service/flink-word-len-rest   ClusterIP   <none>        8081/TCP            78s
17# NAME                             READY   UP-TO-DATE   AVAILABLE   AGE
18# deployment.apps/flink-word-len   1/1     1            1           78s
20# NAME                                        DESIRED   CURRENT   READY   AGE
21# replicaset.apps/flink-word-len-854cf856d8   1         1         1       78s

The Flink web UI can be accessed using kubectl port-forward on port 8081. In the job graph, we see there are two tasks where the first task is performed until tokenizing input messages and the second one is up to sending the average word length records to the output topic.

1kubectl port-forward svc/flink-word-len-rest 8081

Kafka Producer

A simple Python Kafka producer is created to check the output of the application. The producer app sends random text from the Faker package to the input Kafka topic every 1 second by default.

 1# kafka/client/producer.py
 2import os
 3import time
 5from faker import Faker
 6from kafka import KafkaProducer
 9class TextProducer:
10    def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
11        self.bootstrap_servers = bootstrap_servers
12        self.topic_name = topic_name
13        self.kafka_producer = self.create_producer()
15    def create_producer(self):
16        """
17        Returns a KafkaProducer instance
18        """
19        return KafkaProducer(
20            bootstrap_servers=self.bootstrap_servers,
21            value_serializer=lambda v: v.encode("utf-8"),
22        )
24    def send_to_kafka(self, text: str, timestamp_ms: int = None):
25        """
26        Sends text to a Kafka topic.
27        """
28        try:
29            args = {"topic": self.topic_name, "value": text}
30            if timestamp_ms is not None:
31                args = {**args, **{"timestamp_ms": timestamp_ms}}
32            self.kafka_producer.send(**args)
33            self.kafka_producer.flush()
34        except Exception as e:
35            raise RuntimeError("fails to send a message") from e
38if __name__ == "__main__":
39    producer = TextProducer(
40        os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"),
41        os.getenv("TOPIC_NAME", "input-topic"),
42    )
43    fake = Faker()
45    num_events = 0
46    while True:
47        num_events += 1
48        text = fake.text()
49        producer.send_to_kafka(text)
50        if num_events % 5 == 0:
51            print(f"<<<<<{num_events} text sent... current>>>>\n{text}")
52        time.sleep(int(os.getenv("DELAY_SECONDS", "1")))

The Kafka bootstrap server can be exposed on port 29092 using kubectl port-forward and the producer app can be started by executing the Python script.

1kubectl port-forward svc/demo-cluster-kafka-external-bootstrap 29092
3python kafka/client/producer.py

We can see the output topic (output-topic-flink) is created on kafka-ui.

Also, we can check the output messages are created as expected in the Topics tab.

Delete Resources

The Kubernetes resources and minikube cluster can be deleted as shown below.

 1## delete flink operator and related resoruces
 2kubectl delete flinkdeployment/flink-word-len
 3helm uninstall flink-kubernetes-operator
 4helm repo remove flink-operator-repo
 5kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
 7## delete kafka cluster and related resources
 9kubectl delete -f kafka/manifests/kafka-cluster.yaml
10kubectl delete -f kafka/manifests/kafka-ui.yaml
11kubectl delete -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
13## delete minikube
14minikube delete


