Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. With the operator, we can simplify deployment and management of Python stream processing applications. In this series, we discuss how to deploy a PyFlink application and Python Apache Beam pipeline on the Flink Runner on Kubernetes. In Part 1, we first deploy a Kafka cluster on a minikube cluster as the source and sink of the PyFlink application are Kafka topics. Then, the application source is packaged in a custom Docker image and deployed on the minikube cluster using the Flink Kubernetes Operator. Finally, the output of the application is checked by sending messages to the input Kafka topic using a Python producer application.
Setup Kafka Cluster
As the source and sink of the stream processing application are Kafka topics, a Kafka cluster is deployed using the Strimzi Operator on a minikube cluster. We install Strimzi version 0.39.0 and Kubernetes version 1.25.3. Once the minikube CLI and Docker are installed, a minikube cluster can be created by specifying the desired Kubernetes version.
1minikube start --cpus='max' --memory=20480 --addons=metrics-server --kubernetes-version=v1.25.3
Deploy Strimzi Operator
The project repository keeps manifest files that can be used to deploy the Strimzi Operator, Kafka cluster and Kafka management app. If you want to download a different version of the operator, you can download the relevant manifest file by specifying the desired version. By default, the manifest file assumes that the resources are deployed in the myproject namespace. As we deploy them in the default namespace, however, we need to change the resource namespace using sed. The operator can be deployed using kubectl create
.
1## download and deploy strimzi oeprator
2STRIMZI_VERSION="0.39.0"
3
4## (optional) if downloading a different version
5DOWNLOAD_URL=https://github.com/strimzi/strimzi-kafka-operator/releases/download/$STRIMZI_VERSION/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
6curl -L -o kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml ${DOWNLOAD_URL}
7# update namespace from myproject to default
8sed -i 's/namespace: .*/namespace: default/' kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
9
10## deploy strimzi cluster operator
11kubectl create -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
We can check the Strimzi Operator runs as a Deployment.
1kubectl get deploy,rs,po
2# NAME READY UP-TO-DATE AVAILABLE AGE
3# deployment.apps/strimzi-cluster-operator 1/1 1 1 2m50s
4
5# NAME DESIRED CURRENT READY AGE
6# replicaset.apps/strimzi-cluster-operator-8d6d4795c 1 1 1 2m50s
7
8# NAME READY STATUS RESTARTS AGE
9# pod/strimzi-cluster-operator-8d6d4795c-94t8c 1/1 Running 0 2m49s
Deploy Kafka Cluster
We deploy a Kafka cluster with a single broker and Zookeeper node. It has both internal and external listeners on port 9092 and 29092 respectively. Note that the external listener will be used to access the Kafka cluster outside the minikube cluster. Also, the cluster is configured to allow automatic creation of topics (auto.create.topics.enable: “true”) and the default number of partitions is set to 3 (num.partitions: 3).
1# kafka/manifests/kafka-cluster.yaml
2apiVersion: kafka.strimzi.io/v1beta2
3kind: Kafka
4metadata:
5 name: demo-cluster
6spec:
7 kafka:
8 version: 3.5.2
9 replicas: 1
10 resources:
11 requests:
12 memory: 256Mi
13 cpu: 250m
14 limits:
15 memory: 512Mi
16 cpu: 500m
17 listeners:
18 - name: plain
19 port: 9092
20 type: internal
21 tls: false
22 - name: external
23 port: 29092
24 type: nodeport
25 tls: false
26 storage:
27 type: jbod
28 volumes:
29 - id: 0
30 type: persistent-claim
31 size: 20Gi
32 deleteClaim: true
33 config:
34 offsets.topic.replication.factor: 1
35 transaction.state.log.replication.factor: 1
36 transaction.state.log.min.isr: 1
37 default.replication.factor: 1
38 min.insync.replicas: 1
39 inter.broker.protocol.version: "3.5"
40 auto.create.topics.enable: "true"
41 num.partitions: 3
42 zookeeper:
43 replicas: 1
44 resources:
45 requests:
46 memory: 256Mi
47 cpu: 250m
48 limits:
49 memory: 512Mi
50 cpu: 500m
51 storage:
52 type: persistent-claim
53 size: 10Gi
54 deleteClaim: true
The Kafka cluster can be deployed using kubectl create
.
1kubectl create -f kafka/manifests/kafka-cluster.yaml
The Kafka and Zookeeper nodes are managed by the StrimziPodSet custom resource. It also creates multiple services, and we use the following services in this series.
- communication within the Kubernetes cluster
- demo-cluster-kafka-bootstrap - to access Kafka brokers from the client and management apps
- demo-cluster-zookeeper-client - to access Zookeeper node from the management app
- communication from the host
- demo-cluster-kafka-external-bootstrap - to access Kafka brokers from the producer app
1kubectl get po,strimzipodsets.core.strimzi.io,svc -l app.kubernetes.io/instance=demo-cluster
2# NAME READY STATUS RESTARTS AGE
3# pod/demo-cluster-kafka-0 1/1 Running 0 115s
4# pod/demo-cluster-zookeeper-0 1/1 Running 0 2m20s
5
6# NAME PODS READY PODS CURRENT PODS AGE
7# strimzipodset.core.strimzi.io/demo-cluster-kafka 1 1 1 115s
8# strimzipodset.core.strimzi.io/demo-cluster-zookeeper 1 1 1 2m20s
9
10# NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
11# service/demo-cluster-kafka-bootstrap ClusterIP 10.101.175.64 <none> 9091/TCP,9092/TCP 115s
12# service/demo-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,8443/TCP,9092/TCP 115s
13# service/demo-cluster-kafka-external-0 NodePort 10.106.155.20 <none> 29092:32475/TCP 115s
14# service/demo-cluster-kafka-external-bootstrap NodePort 10.111.244.128 <none> 29092:32674/TCP 115s
15# service/demo-cluster-zookeeper-client ClusterIP 10.100.215.29 <none> 2181/TCP 2m20s
16# service/demo-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 2m20s
Deploy Kafka UI
UI for Apache Kafka (kafka-ui) is a free and open-source Kafka management application, and it is deployed as a Kubernetes Deployment. The Deployment is configured to have a single instance, and the Kafka cluster access details are specified as environment variables.
1# kafka/manifests/kafka-ui.yaml
2apiVersion: v1
3kind: Service
4metadata:
5 labels:
6 app: kafka-ui
7 name: kafka-ui
8spec:
9 type: ClusterIP
10 ports:
11 - port: 8080
12 targetPort: 8080
13 selector:
14 app: kafka-ui
15---
16apiVersion: apps/v1
17kind: Deployment
18metadata:
19 labels:
20 app: kafka-ui
21 name: kafka-ui
22spec:
23 replicas: 1
24 selector:
25 matchLabels:
26 app: kafka-ui
27 template:
28 metadata:
29 labels:
30 app: kafka-ui
31 spec:
32 containers:
33 - image: provectuslabs/kafka-ui:v0.7.1
34 name: kafka-ui-container
35 ports:
36 - containerPort: 8080
37 env:
38 - name: KAFKA_CLUSTERS_0_NAME
39 value: demo-cluster
40 - name: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS
41 value: demo-cluster-kafka-bootstrap:9092
42 - name: KAFKA_CLUSTERS_0_ZOOKEEPER
43 value: demo-cluster-zookeeper-client:2181
44 resources:
45 requests:
46 memory: 256Mi
47 cpu: 250m
48 limits:
49 memory: 512Mi
50 cpu: 500m
The Kafka management app (kafka-ui) can be deployed using kubectl create
.
1kubectl create -f kafka/manifests/kafka-ui.yaml
2
3kubectl get all -l app=kafka-ui
4# NAME READY STATUS RESTARTS AGE
5# pod/kafka-ui-65dbbc98dc-zl5gv 1/1 Running 0 35s
6
7# NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
8# service/kafka-ui ClusterIP 10.109.14.33 <none> 8080/TCP 36s
9
10# NAME READY UP-TO-DATE AVAILABLE AGE
11# deployment.apps/kafka-ui 1/1 1 1 35s
12
13# NAME DESIRED CURRENT READY AGE
14# replicaset.apps/kafka-ui-65dbbc98dc 1 1 1 35s
We can use kubectl port-forward
to connect to the kafka-ui server running in the minikube cluster on port 8080.
1kubectl port-forward svc/kafka-ui 8080
Develop Stream Processing App
A stream processing application is developed using PyFlink, and it is packaged in a custom Docker image for deployment.
PyFlink Code
The application begins with reading text messages from a Kafka topic named input-topic, followed by extracting words by splitting the messages. Next, as we are going to calculate the average lengths of all words, all of them are added to a Tumbling window of 5 seconds - we use a processing time window for simplicity. After that, it calculates the average length of words within a window. Note that, as we are going to include the window start and end timestamps, the ProcessAllWindowFunction is used instead of the AggregateFunction. Finally, the output reocrds are sent to a Kafka topic named output-topic-flink.
1# flink/word_len.py
2import os
3import re
4import json
5import datetime
6import logging
7import typing
8
9from pyflink.common import WatermarkStrategy
10from pyflink.datastream import (
11 DataStream,
12 StreamExecutionEnvironment,
13 RuntimeExecutionMode,
14)
15from pyflink.common.typeinfo import Types
16from pyflink.datastream.functions import ProcessAllWindowFunction
17from pyflink.datastream.window import TumblingProcessingTimeWindows, Time, TimeWindow
18from pyflink.datastream.connectors.kafka import (
19 KafkaSource,
20 KafkaOffsetsInitializer,
21 KafkaSink,
22 KafkaRecordSerializationSchema,
23 DeliveryGuarantee,
24)
25from pyflink.common.serialization import SimpleStringSchema
26
27
28def tokenize(element: str):
29 for word in re.findall(r"[A-Za-z\']+", element):
30 yield word
31
32
33def create_message(element: typing.Tuple[str, str, float]):
34 return json.dumps(dict(zip(["window_start", "window_end", "avg_len"], element)))
35
36
37class AverageWindowFunction(ProcessAllWindowFunction):
38 def process(
39 self, context: ProcessAllWindowFunction.Context, elements: typing.Iterable[str]
40 ) -> typing.Iterable[typing.Tuple[str, str, float]]:
41 window: TimeWindow = context.window()
42 window_start = datetime.datetime.fromtimestamp(window.start // 1000).isoformat(
43 timespec="seconds"
44 )
45 window_end = datetime.datetime.fromtimestamp(window.end // 1000).isoformat(
46 timespec="seconds"
47 )
48 length, count = 0, 0
49 for e in elements:
50 length += len(e)
51 count += 1
52 result = window_start, window_end, length / count if count else float("NaN")
53 logging.info(f"AverageWindowFunction: result - {result}")
54 yield result
55
56
57def define_workflow(source_system: DataStream):
58 return (
59 source_system.flat_map(tokenize)
60 .window_all(TumblingProcessingTimeWindows.of(Time.seconds(5)))
61 .process(AverageWindowFunction())
62 )
63
64
65if __name__ == "__main__":
66 env = StreamExecutionEnvironment.get_execution_environment()
67 env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
68 env.enable_checkpointing(5000)
69 env.set_parallelism(3)
70
71 input_source = (
72 KafkaSource.builder()
73 .set_bootstrap_servers(os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"))
74 .set_topics(os.getenv("INPUT_TOPIC", "input-topic"))
75 .set_group_id(os.getenv("GROUP_ID", "flink-word-len"))
76 .set_starting_offsets(KafkaOffsetsInitializer.latest())
77 .set_value_only_deserializer(SimpleStringSchema())
78 .build()
79 )
80
81 input_stream = env.from_source(
82 input_source, WatermarkStrategy.no_watermarks(), "input_source"
83 )
84
85 output_sink = (
86 KafkaSink.builder()
87 .set_bootstrap_servers(os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"))
88 .set_record_serializer(
89 KafkaRecordSerializationSchema.builder()
90 .set_topic(os.getenv("OUTPUT_TOPIC", "output-topic-flink"))
91 .set_value_serialization_schema(SimpleStringSchema())
92 .build()
93 )
94 .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
95 .build()
96 )
97
98 define_workflow(input_stream).map(
99 create_message, output_type=Types.STRING()
100 ).sink_to(output_sink).name("output_sink")
101
102 env.execute("avg-word-length-flink")
Build Docker Image
A custom Docker image named flink-python-example:1.17 is created for deployment. Based on the official flink:1.17 image, it downloads dependnent Jar files, installs Python and the apache-flink package, and adds the application source.
1# flink/Dockerfile
2FROM flink:1.17
3
4ARG PYTHON_VERSION
5ENV PYTHON_VERSION=${PYTHON_VERSION:-3.10.13}
6ARG FLINK_VERSION
7ENV FLINK_VERSION=${FLINK_VERSION:-1.17.2}
8
9## download connector libs
10RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/3.2.3/kafka-clients-3.2.3.jar \
11 && wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/$FLINK_VERSION/flink-sql-connector-kafka-$FLINK_VERSION.jar
12
13## install python
14RUN apt-get update -y && \
15 apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev liblzma-dev && \
16 wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz && \
17 tar -xvf Python-${PYTHON_VERSION}.tgz && \
18 cd Python-${PYTHON_VERSION} && \
19 ./configure --without-tests --enable-shared && \
20 make -j6 && \
21 make install && \
22 ldconfig /usr/local/lib && \
23 cd .. && rm -f Python-${PYTHON_VERSION}.tgz && rm -rf Python-${PYTHON_VERSION} && \
24 ln -s /usr/local/bin/python3 /usr/local/bin/python && \
25 apt-get clean && \
26 rm -rf /var/lib/apt/lists/*
27
28## install pip packages
29RUN pip3 install apache-flink==${FLINK_VERSION}
30
31## add python script
32USER flink
33RUN mkdir /opt/flink/usrlib
34ADD word_len.py /opt/flink/usrlib/word_len.py
As the custom image should be accessible in the minikube cluster, we should point the terminal’s docker-cli to the minikube’s Docker engine. Then, the image can be built as usual using docker build
.
1# point the docker-cli to the minikube's Docker engine
2eval $(minikube docker-env)
3# build image
4docker build -t flink-python-example:1.17 flink/
Deploy Stream Processing App
The Pyflink application is deployed as a single job of a Flink cluster using the Flink Kubernetes Operator. Then, we check the output of the application by sending text messages to the input Kafka topic.
Deploy Flink Kubernetes Operator
We first need to install the certificate manager on the minikube cluster to enable adding the webhook component. Then, the operator can be installed using a Helm chart, and the version 1.8.0 is installed in the post.
1kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
2helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
3helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
4# NAME: flink-kubernetes-operator
5# LAST DEPLOYED: Wed May 29 05:05:08 2024
6# NAMESPACE: default
7# STATUS: deployed
8# REVISION: 1
9# TEST SUITE: None
10
11helm list
12# NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
13# flink-kubernetes-operator default 1 2024-05-29 05:05:08.241071054 +1000 AEST deployed flink-kubernetes-operator-1.8.0 1.8.0
Deploy PyFlink App
The PyFlink app is deployed as a single job of a Flink cluster using the FlinkDeployment custom resource. In the manifest file, we configure common properties such as the Docker image, Flink version, cluster configuration and pod template. These properties are applied to the Flink job manager and task manager, and only the replica and resource are specified additionally to them. Finally, the PyFlink app is added as a job where the main PythonDriver entry class requires the paths of the Python executable and application source script. See this page for details about the FlinkDeployment resource.
1# flink/word_len.yml
2apiVersion: flink.apache.org/v1beta1
3kind: FlinkDeployment
4metadata:
5 name: flink-word-len
6spec:
7 image: flink-python-example:1.17
8 imagePullPolicy: Never
9 flinkVersion: v1_17
10 flinkConfiguration:
11 taskmanager.numberOfTaskSlots: "5"
12 serviceAccount: flink
13 podTemplate:
14 spec:
15 containers:
16 - name: flink-main-container
17 env:
18 - name: BOOTSTRAP_SERVERS
19 value: demo-cluster-kafka-bootstrap:9092
20 - name: INPUT_TOPIC
21 value: input-topic
22 - name: GROUP_ID
23 value: flink-word-len
24 - name: OUTPUT_TOPIC
25 value: output-topic-flink
26 volumeMounts:
27 - mountPath: /opt/flink/log
28 name: flink-logs
29 - mountPath: /tmp/flink-artifact-staging
30 name: flink-staging
31 volumes:
32 - name: flink-logs
33 emptyDir: {}
34 - name: flink-staging
35 emptyDir: {}
36 jobManager:
37 resource:
38 memory: "2048m"
39 cpu: 1
40 taskManager:
41 replicas: 2
42 resource:
43 memory: "2048m"
44 cpu: 1
45 job:
46 jarURI: local:///opt/flink/opt/flink-python-1.17.2.jar
47 entryClass: "org.apache.flink.client.python.PythonDriver"
48 args:
49 [
50 "-pyclientexec",
51 "/usr/local/bin/python3",
52 "-py",
53 "/opt/flink/usrlib/word_len.py",
54 ]
55 parallelism: 3
56 upgradeMode: stateless
Before we deploy the PyFlink app, make sure the input topic is created. We can create it using kafka-ui easily.
The app can be deployed using kubectl create
, and it creates the FlinkDeployment custom resource, which manages the job manager deployment, task manager pod and associated services.
1kubectl create -f flink/word_len.yml
2# flinkdeployment.flink.apache.org/flink-word-len created
3
4kubectl get flinkdeployments.flink.apache.org
5# NAME JOB STATUS LIFECYCLE STATE
6# flink-word-len RUNNING STABLE
7
8kubectl get all -l app=flink-word-len
9# NAME READY STATUS RESTARTS AGE
10# pod/flink-word-len-854cf856d8-w8cjf 1/1 Running 0 78s
11# pod/flink-word-len-taskmanager-1-1 1/1 Running 0 66s
12
13# NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
14# service/flink-word-len ClusterIP None <none> 6123/TCP,6124/TCP 78s
15# service/flink-word-len-rest ClusterIP 10.107.62.132 <none> 8081/TCP 78s
16
17# NAME READY UP-TO-DATE AVAILABLE AGE
18# deployment.apps/flink-word-len 1/1 1 1 78s
19
20# NAME DESIRED CURRENT READY AGE
21# replicaset.apps/flink-word-len-854cf856d8 1 1 1 78s
The Flink web UI can be accessed using kubectl port-forward
on port 8081. In the job graph, we see there are two tasks where the first task is performed until tokenizing input messages and the second one is up to sending the average word length records to the output topic.
1kubectl port-forward svc/flink-word-len-rest 8081
Kafka Producer
A simple Python Kafka producer is created to check the output of the application. The producer app sends random text from the Faker package to the input Kafka topic every 1 second by default.
1# kafka/client/producer.py
2import os
3import time
4
5from faker import Faker
6from kafka import KafkaProducer
7
8
9class TextProducer:
10 def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
11 self.bootstrap_servers = bootstrap_servers
12 self.topic_name = topic_name
13 self.kafka_producer = self.create_producer()
14
15 def create_producer(self):
16 """
17 Returns a KafkaProducer instance
18 """
19 return KafkaProducer(
20 bootstrap_servers=self.bootstrap_servers,
21 value_serializer=lambda v: v.encode("utf-8"),
22 )
23
24 def send_to_kafka(self, text: str, timestamp_ms: int = None):
25 """
26 Sends text to a Kafka topic.
27 """
28 try:
29 args = {"topic": self.topic_name, "value": text}
30 if timestamp_ms is not None:
31 args = {**args, **{"timestamp_ms": timestamp_ms}}
32 self.kafka_producer.send(**args)
33 self.kafka_producer.flush()
34 except Exception as e:
35 raise RuntimeError("fails to send a message") from e
36
37
38if __name__ == "__main__":
39 producer = TextProducer(
40 os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"),
41 os.getenv("TOPIC_NAME", "input-topic"),
42 )
43 fake = Faker()
44
45 num_events = 0
46 while True:
47 num_events += 1
48 text = fake.text()
49 producer.send_to_kafka(text)
50 if num_events % 5 == 0:
51 print(f"<<<<<{num_events} text sent... current>>>>\n{text}")
52 time.sleep(int(os.getenv("DELAY_SECONDS", "1")))
The Kafka bootstrap server can be exposed on port 29092 using kubectl port-forward
and the producer app can be started by executing the Python script.
1kubectl port-forward svc/demo-cluster-kafka-external-bootstrap 29092
2
3python kafka/client/producer.py
We can see the output topic (output-topic-flink) is created on kafka-ui.
Also, we can check the output messages are created as expected in the Topics tab.
Delete Resources
The Kubernetes resources and minikube cluster can be deleted as shown below.
1## delete flink operator and related resoruces
2kubectl delete flinkdeployment/flink-word-len
3helm uninstall flink-kubernetes-operator
4helm repo remove flink-operator-repo
5kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
6
7## delete kafka cluster and related resources
8STRIMZI_VERSION="0.39.0"
9kubectl delete -f kafka/manifests/kafka-cluster.yaml
10kubectl delete -f kafka/manifests/kafka-ui.yaml
11kubectl delete -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
12
13## delete minikube
14minikube delete
Summary
Flink Kubernetes Operator acts as a control plane to manage the complete deployment lifecycle of Apache Flink applications. With the operator, we can simplify deployment and management of Python stream processing applications on Kubernetes. In this post, we discussed how to deploy a PyFlink application on Kubernetes. We first deployed a Kafka cluster on a minikube cluster as the source and sink of the PyFlink application are Kafka topics. Then, the application source is packaged in a custom Docker image and deployed on the minikube cluster using the Flink Kubernetes Operator. Finally, the output of the application is checked by sending messages to the input Kafka topic using a Python producer application.
Comments