In this post, we develop an Apache Beam pipeline using the Python SDK and deploy it on an Apache Flink cluster via the Apache Flink Runner. Same as Part I, we deploy a Kafka cluster using the Strimzi Operator on a minikube cluster as the pipeline uses Apache Kafka topics for its data source and sink. Then, we develop the pipeline as a Python package and add the package to a custom Docker image so that Python user code can be executed externally. For deployment, we create a Flink session cluster via the Flink Kubernetes Operator, and deploy the pipeline using a Kubernetes job. Finally, we check the output of the application by sending messages to the input Kafka topic using a Python producer application.
Resources to run a Python Beam pipeline on Flink
We develop an Apache Beam pipeline using the Python SDK and deploy it on an Apache Flink cluster via the Apache Flink Runner. While the Flink cluster is created by the Flink Kubernetes Operator, we need two components (Job Service and SDK Harness) to run the pipeline on the Flink Runner. Roughly speaking, the former converts details about a Python pipeline into a format that the Flink runner can understand while the Python user code is executed by the latter - see this post for more details. Note that the Python SDK provides convenience wrappers to manage those components, and it can be utilised by specifying FlinkRunner in the pipeline option (i.e. --runner=FlinkRunner
). We let the Job Service be managed automatically while relying on our own SDK Harness as a sidecar container for simplicity. Also, we need the Java IO Expansion Service as an extra because the pipeline uses Apache Kafka topics for its data source and sink, and the Kafka Connector I/O is developed in Java. Simply put, the expansion service is used to serialise data for the Java SDK.
Setup Kafka Cluster
Same as Part I, we deploy a Kafka cluster using the Strimzi Operator on a minikube cluster. Also, we create UI for Apache Kafka (kafka-ui) to facilitate development. See Part I for details about how to create them. The source of this post can be found in this GitHub repository.
When a Kafka cluster is created successfully, we can see the following resources.
1kubectl get po,strimzipodsets.core.strimzi.io,svc -l app.kubernetes.io/instance=demo-cluster
2NAME READY STATUS RESTARTS AGE
3pod/demo-cluster-kafka-0 1/1 Running 0 4m16s
4pod/demo-cluster-zookeeper-0 1/1 Running 0 4m44s
5
6NAME PODS READY PODS CURRENT PODS AGE
7strimzipodset.core.strimzi.io/demo-cluster-kafka 1 1 1 4m17s
8strimzipodset.core.strimzi.io/demo-cluster-zookeeper 1 1 1 4m45s
9
10NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
11service/demo-cluster-kafka-bootstrap ClusterIP 10.100.17.151 <none> 9091/TCP,9092/TCP 4m17s
12service/demo-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,8443/TCP,9092/TCP 4m17s
13service/demo-cluster-kafka-external-0 NodePort 10.104.252.159 <none> 29092:30867/TCP 4m17s
14service/demo-cluster-kafka-external-bootstrap NodePort 10.98.229.176 <none> 29092:31488/TCP 4m17s
15service/demo-cluster-zookeeper-client ClusterIP 10.109.80.70 <none> 2181/TCP 4m46s
16service/demo-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m46s
The Kafka management app (kafka-ui) is deployed as a Kubernetes deployment and exposed by a single service.
1kubectl get all -l app=kafka-ui
2NAME READY STATUS RESTARTS AGE
3pod/kafka-ui-65dbbc98dc-xjnvh 1/1 Running 0 10m
4
5NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
6service/kafka-ui ClusterIP 10.106.116.129 <none> 8080/TCP 10m
7
8NAME READY UP-TO-DATE AVAILABLE AGE
9deployment.apps/kafka-ui 1/1 1 1 10m
10
11NAME DESIRED CURRENT READY AGE
12replicaset.apps/kafka-ui-65dbbc98dc 1 1 1 10m
We can use kubectl port-forward
to connect to the kafka-ui server running in the minikube cluster on port 8080.
1kubectl port-forward svc/kafka-ui 8080
Develop Stream Processing App
We develop an Apache Beam pipeline as a Python package and add it to a custom Docker image, which is used to execute Python user code (SDK Harness). We also build another custom Docker image, which adds the Java SDK of Apache Beam to the official Flink base image. This image is used for deploying a Flink cluster as well as for executing Java user code of the Kafka Connector I/O.
Beam Pipeline Code
The application begins with reading text messages from an input Kafka topic, followed by extracting words by splitting the messages (ReadWordsFromKafka). Next, the elements (words) are added to a fixed time window of 5 seconds and their average length is calculated (CalculateAvgWordLen). Finally, we include the window start/end timestamps and send the updated element to an output Kafka topic (WriteWordLenToKafka).
Note that we create a custom Java IO Expansion Service (get_expansion_service
) and add it to the ReadFromKafka and WriteToKafka transforms of the Kafka Connector I/O. Although the Kafka I/O provides a function to create that service, it did not work for me (or I do not understand how to make use of it yet). Instead, I ended up creating a custom service as illustrated in Building Big Data Pipelines with Apache Beam by Jan Lukavský. Note further that the expansion service Jar file (beam-sdks-java-io-expansion-service.jar) should exist in the Kubernetes job that executes the pipeline while the Java SDK (/opt/apache/beam/boot) should exist in the runner worker.
1# beam/word_len/word_len.py
2import json
3import argparse
4import re
5import logging
6import typing
7
8import apache_beam as beam
9from apache_beam import pvalue
10from apache_beam.io import kafka
11from apache_beam.transforms.window import FixedWindows
12from apache_beam.options.pipeline_options import PipelineOptions
13from apache_beam.options.pipeline_options import SetupOptions
14
15from apache_beam.transforms.external import JavaJarExpansionService
16
17
18def get_expansion_service(
19 jar="/opt/apache/beam/jars/beam-sdks-java-io-expansion-service.jar", args=None
20):
21 if args == None:
22 args = [
23 "--defaultEnvironmentType=PROCESS",
24 '--defaultEnvironmentConfig={"command": "/opt/apache/beam/boot"}',
25 "--experiments=use_deprecated_read",
26 ]
27 return JavaJarExpansionService(jar, ["{{PORT}}"] + args)
28
29
30class WordAccum(typing.NamedTuple):
31 length: int
32 count: int
33
34
35beam.coders.registry.register_coder(WordAccum, beam.coders.RowCoder)
36
37
38def decode_message(kafka_kv: tuple, verbose: bool = False):
39 if verbose:
40 print(kafka_kv)
41 return kafka_kv[1].decode("utf-8")
42
43
44def tokenize(element: str):
45 return re.findall(r"[A-Za-z\']+", element)
46
47
48def create_message(element: typing.Tuple[str, str, float]):
49 msg = json.dumps(dict(zip(["window_start", "window_end", "avg_len"], element)))
50 print(msg)
51 return "".encode("utf-8"), msg.encode("utf-8")
52
53
54class AverageFn(beam.CombineFn):
55 def create_accumulator(self):
56 return WordAccum(length=0, count=0)
57
58 def add_input(self, mutable_accumulator: WordAccum, element: str):
59 length, count = tuple(mutable_accumulator)
60 return WordAccum(length=length + len(element), count=count + 1)
61
62 def merge_accumulators(self, accumulators: typing.List[WordAccum]):
63 lengths, counts = zip(*accumulators)
64 return WordAccum(length=sum(lengths), count=sum(counts))
65
66 def extract_output(self, accumulator: WordAccum):
67 length, count = tuple(accumulator)
68 return length / count if count else float("NaN")
69
70 def get_accumulator_coder(self):
71 return beam.coders.registry.get_coder(WordAccum)
72
73
74class AddWindowTS(beam.DoFn):
75 def process(self, avg_len: float, win_param=beam.DoFn.WindowParam):
76 yield (
77 win_param.start.to_rfc3339(),
78 win_param.end.to_rfc3339(),
79 avg_len,
80 )
81
82
83class ReadWordsFromKafka(beam.PTransform):
84 def __init__(
85 self,
86 bootstrap_servers: str,
87 topics: typing.List[str],
88 group_id: str,
89 verbose: bool = False,
90 expansion_service: typing.Any = None,
91 label: str | None = None,
92 ) -> None:
93 super().__init__(label)
94 self.boostrap_servers = bootstrap_servers
95 self.topics = topics
96 self.group_id = group_id
97 self.verbose = verbose
98 self.expansion_service = expansion_service
99
100 def expand(self, input: pvalue.PBegin):
101 return (
102 input
103 | "ReadFromKafka"
104 >> kafka.ReadFromKafka(
105 consumer_config={
106 "bootstrap.servers": self.boostrap_servers,
107 "auto.offset.reset": "latest",
108 # "enable.auto.commit": "true",
109 "group.id": self.group_id,
110 },
111 topics=self.topics,
112 timestamp_policy=kafka.ReadFromKafka.create_time_policy,
113 commit_offset_in_finalize=True,
114 expansion_service=self.expansion_service,
115 )
116 | "DecodeMessage" >> beam.Map(decode_message)
117 | "Tokenize" >> beam.FlatMap(tokenize)
118 )
119
120
121class CalculateAvgWordLen(beam.PTransform):
122 def expand(self, input: pvalue.PCollection):
123 return (
124 input
125 | "Windowing" >> beam.WindowInto(FixedWindows(size=5))
126 | "GetAvgWordLength" >> beam.CombineGlobally(AverageFn()).without_defaults()
127 )
128
129
130class WriteWordLenToKafka(beam.PTransform):
131 def __init__(
132 self,
133 bootstrap_servers: str,
134 topic: str,
135 expansion_service: typing.Any = None,
136 label: str | None = None,
137 ) -> None:
138 super().__init__(label)
139 self.boostrap_servers = bootstrap_servers
140 self.topic = topic
141 self.expansion_service = expansion_service
142
143 def expand(self, input: pvalue.PCollection):
144 return (
145 input
146 | "AddWindowTS" >> beam.ParDo(AddWindowTS())
147 | "CreateMessages"
148 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
149 | "WriteToKafka"
150 >> kafka.WriteToKafka(
151 producer_config={"bootstrap.servers": self.boostrap_servers},
152 topic=self.topic,
153 expansion_service=self.expansion_service,
154 )
155 )
156
157
158def run(argv=None, save_main_session=True):
159 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
160 parser.add_argument(
161 "--deploy",
162 dest="deploy",
163 action="store_true",
164 default="Flag to indicate whether to deploy to a cluster",
165 )
166 parser.add_argument(
167 "--bootstrap_servers",
168 dest="bootstrap",
169 default="host.docker.internal:29092",
170 help="Kafka bootstrap server addresses",
171 )
172 parser.add_argument(
173 "--input_topic",
174 dest="input",
175 default="input-topic",
176 help="Kafka input topic name",
177 )
178 parser.add_argument(
179 "--output_topic",
180 dest="output",
181 default="output-topic-beam",
182 help="Kafka output topic name",
183 )
184 parser.add_argument(
185 "--group_id",
186 dest="group",
187 default="beam-word-len",
188 help="Kafka output group ID",
189 )
190
191 known_args, pipeline_args = parser.parse_known_args(argv)
192
193 print(known_args)
194 print(pipeline_args)
195
196 # We use the save_main_session option because one or more DoFn's in this
197 # workflow rely on global context (e.g., a module imported at module level).
198 pipeline_options = PipelineOptions(pipeline_args)
199 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
200
201 expansion_service = None
202 if known_args.deploy is True:
203 expansion_service = get_expansion_service()
204
205 with beam.Pipeline(options=pipeline_options) as p:
206 (
207 p
208 | "ReadWordsFromKafka"
209 >> ReadWordsFromKafka(
210 bootstrap_servers=known_args.bootstrap,
211 topics=[known_args.input],
212 group_id=known_args.group,
213 expansion_service=expansion_service,
214 )
215 | "CalculateAvgWordLen" >> CalculateAvgWordLen()
216 | "WriteWordLenToKafka"
217 >> WriteWordLenToKafka(
218 bootstrap_servers=known_args.bootstrap,
219 topic=known_args.output,
220 expansion_service=expansion_service,
221 )
222 )
223
224 logging.getLogger().setLevel(logging.DEBUG)
225 logging.info("Building pipeline ...")
226
227
228if __name__ == "__main__":
229 run()
The pipeline script is added to a Python package under a folder named word_len, and a simple module named run is created as it should be executed as a module (i.e. python -m ...
). (I had an error if I execute the pipeline as a script.) Note that this way of packaging is for demonstration only and see this document for a recommended way of packaging a pipeline.
1# beam/word_len/run.py
2from . import *
3
4run()
Overall, the pipeline package is structured as shown below.
1tree beam/word_len
2
3beam/word_len
4├── __init__.py
5├── run.py
6└── word_len.py
Build Docker Images
As mentioned earlier, we build a custom Docker image (beam-python-example:1.16) that is used for deploying a Flink cluster as well as for executing Java user code of the Kafka Connector I/O.
1# beam/Dockerfile
2FROM flink:1.16
3
4COPY --from=apache/beam_java11_sdk:2.56.0 /opt/apache/beam/ /opt/apache/beam/
We also build another custom Docker image (beam-python-harness:2.56.0) to run Python user code (SDK Harness). From the Python SDK Docker image, it first installs JDK Development Kit (JDK) and downloads the Java IO Expansion Service Jar file. Then, Beam pipeline packages are copied to the /app folder, and it ends up adding the app folder into the PYTHONPATH environment variable, which makes the packages to be searchable.
1# beam/Dockerfile-python-harness
2FROM apache/beam_python3.10_sdk:2.56.0
3
4ARG BEAM_VERSION
5ENV BEAM_VERSION=${BEAM_VERSION:-2.56.0}
6ENV REPO_BASE_URL=https://repo1.maven.org/maven2/org/apache/beam
7
8RUN apt-get update && apt-get install -y default-jdk
9
10RUN mkdir -p /opt/apache/beam/jars \
11 && wget ${REPO_BASE_URL}/beam-sdks-java-io-expansion-service/${BEAM_VERSION}/beam-sdks-java-io-expansion-service-${BEAM_VERSION}.jar \
12 --progress=bar:force:noscroll -O /opt/apache/beam/jars/beam-sdks-java-io-expansion-service.jar
13
14COPY word_len /app/word_len
15COPY word_count /app/word_count
16
17ENV PYTHONPATH="$PYTHONPATH:/app"
As the custom images should be accessible in the minikube cluster, we should point the terminal’s docker-cli to the minikube’s Docker engine. Then, the images can be built as usual using docker build
.
1eval $(minikube docker-env)
2docker build -t beam-python-example:1.16 beam/
3docker build -t beam-python-harness:2.56.0 -f beam/Dockerfile-python-harness beam/
Deploy Stream Processing App
The Beam pipeline is executed on a Flink session cluster, which is deployed via the Flink Kubernetes Operator. Note that the application deployment mode where the Beam pipeline is deployed as a Flink job doesn’t seem to work (or I don’t understand how to do so yet) due to either job submission timeout error or failing to upload the job artifact. After the pipeline is deployed, we check the output of the application by sending text messages to the input Kafka topic.
Deploy Flink Kubernetes Operator
We first need to install the certificate manager on the minikube cluster to enable adding the webhook component. Then, the operator can be installed using a Helm chart, and the version 1.8.0 is installed in the post.
1kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
2helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
3helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
4# NAME: flink-kubernetes-operator
5# LAST DEPLOYED: Mon Jun 03 21:37:45 2024
6# NAMESPACE: default
7# STATUS: deployed
8# REVISION: 1
9# TEST SUITE: None
10
11helm list
12# NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
13# flink-kubernetes-operator default 1 2024-06-03 21:37:45.579302452 +1000 AEST deployed flink-kubernetes-operator-1.8.0 1.8.0
Deploy Beam Pipeline
First, we create a Flink session cluster. In the manifest file, we configure common properties such as the Docker image, Flink version, cluster configuration and pod template. These properties are applied to the Flink job manager and task manager, and only the replica and resource are specified additionally to them. Note that we add a sidecar container to the task manager and this SDK Harness container is configured to execute Python user code - see the job configuration below.
1# beam/word_len_cluster.yml
2apiVersion: flink.apache.org/v1beta1
3kind: FlinkDeployment
4metadata:
5 name: word-len-cluster
6spec:
7 image: beam-python-example:1.16
8 imagePullPolicy: Never
9 flinkVersion: v1_16
10 flinkConfiguration:
11 taskmanager.numberOfTaskSlots: "10"
12 serviceAccount: flink
13 podTemplate:
14 spec:
15 containers:
16 - name: flink-main-container
17 volumeMounts:
18 - mountPath: /opt/flink/log
19 name: flink-logs
20 volumes:
21 - name: flink-logs
22 emptyDir: {}
23 jobManager:
24 resource:
25 memory: "2048Mi"
26 cpu: 2
27 taskManager:
28 replicas: 1
29 resource:
30 memory: "2048Mi"
31 cpu: 2
32 podTemplate:
33 spec:
34 containers:
35 - name: python-harness
36 image: beam-python-harness:2.56.0
37 args: ["-worker_pool"]
38 ports:
39 - containerPort: 50000
40 name: harness-port
The pipeline is deployed using a Kubernetes job, and the custom SDK Harness image is used to execute the pipeline as a module. The first two arguments are application specific arguments and the rest are arguments for pipeline options. The pipeline arguments are self-explanatory, or you can check available options in the pipeline options source and Flink Runner document. Note that, to execute Python user code in the sidecar container, we set the environment type to EXTERNAL and environment config to localhost:50000.
1# beam/word_len_job.yml
2apiVersion: batch/v1
3kind: Job
4metadata:
5 name: word-len-job
6spec:
7 template:
8 metadata:
9 labels:
10 app: word-len-job
11 spec:
12 containers:
13 - name: beam-word-len-job
14 image: beam-python-harness:2.56.0
15 command: ["python"]
16 args:
17 - "-m"
18 - "word_len.run"
19 - "--deploy"
20 - "--bootstrap_servers=demo-cluster-kafka-bootstrap:9092"
21 - "--runner=FlinkRunner"
22 - "--flink_master=word-len-cluster-rest:8081"
23 - "--job_name=beam-word-len"
24 - "--streaming"
25 - "--parallelism=3"
26 - "--flink_submit_uber_jar"
27 - "--environment_type=EXTERNAL"
28 - "--environment_config=localhost:50000"
29 - "--checkpointing_interval=10000"
30 restartPolicy: Never
The session cluster and job can be deployed using kubectl create
, and the former creates the FlinkDeployment custom resource, which manages the job manager deployment, task manager pod and associated services. When we check the log of the job’s pod, we see that it starts the Job Service after downloading the Jar file, uploads the pipeline artifact, submit the pipeline as a Flink job and continuously monitors the job status.
1kubectl create -f beam/word_len_cluster.yml
2# flinkdeployment.flink.apache.org/word-len-cluster created
3kubectl create -f beam/word_len_job.yml
4# job.batch/word-len-job created
5
6kubectl logs word-len-job-p5rph -f
7# WARNING:apache_beam.options.pipeline_options:Unknown pipeline options received: --checkpointing_interval=10000. Ignore if flags are used for internal purposes.
8# WARNING:apache_beam.options.pipeline_options:Unknown pipeline options received: --checkpointing_interval=10000. Ignore if flags are used for internal purposes.
9# INFO:root:Building pipeline ...
10# INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol scheme to flink_master parameter: http://word-len-cluster-rest:8081
11# WARNING:apache_beam.options.pipeline_options:Unknown pipeline options received: --checkpointing_interval=10000. Ignore if flags are used for internal purposes.
12# DEBUG:apache_beam.runners.portability.abstract_job_service:Got Prepare request.
13# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
14# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/config HTTP/1.1" 200 240
15# INFO:apache_beam.utils.subprocess_server:Downloading job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16-job-server/2.56.0/beam-runners-flink-1.16-job-server-2.56.0.jar
16# INFO:apache_beam.runners.portability.abstract_job_service:Artifact server started on port 43287
17# DEBUG:apache_beam.runners.portability.abstract_job_service:Prepared job 'job' as 'job-edc1c2f1-80ef-48b7-af14-7e6fc86f338a'
18# INFO:apache_beam.runners.portability.abstract_job_service:Running job 'job-edc1c2f1-80ef-48b7-af14-7e6fc86f338a'
19# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
20# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "POST /v1/jars/upload HTTP/1.1" 200 148
21# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
22# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "POST /v1/jars/e1984c45-d8bc-4aa1-9b66-369a23826921_beam.jar/run HTTP/1.1" 200 44
23# INFO:apache_beam.runners.portability.flink_uber_jar_job_server:Started Flink job as a403cb2f92fecee65b8fd7cc8ac6e68a
24# INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
25# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
26# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
27# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
28# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
29# INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING
30# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
31# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
32# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
33# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
34# ...
After deployment is complete, we can see the following Flink session cluster and job related resources.
1kubectl get all -l app=word-len-cluster
2# NAME READY STATUS RESTARTS AGE
3# pod/word-len-cluster-7c98f6f868-d4hbx 1/1 Running 0 5m32s
4# pod/word-len-cluster-taskmanager-1-1 2/2 Running 0 4m3s
5
6# NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
7# service/word-len-cluster ClusterIP None <none> 6123/TCP,6124/TCP 5m32s
8# service/word-len-cluster-rest ClusterIP 10.104.23.28 <none> 8081/TCP 5m32s
9
10# NAME READY UP-TO-DATE AVAILABLE AGE
11# deployment.apps/word-len-cluster 1/1 1 1 5m32s
12
13# NAME DESIRED CURRENT READY AGE
14# replicaset.apps/word-len-cluster-7c98f6f868 1 1 1 5m32s
15
16kubectl get all -l app=word-len-job
17# NAME READY STATUS RESTARTS AGE
18# pod/word-len-job-24r6q 1/1 Running 0 5m24s
19
20# NAME COMPLETIONS DURATION AGE
21# job.batch/word-len-job 0/1 5m24s 5m24s
The Flink web UI can be accessed using kubectl port-forward
on port 8081. In the job graph, we see there are two tasks where the first task is performed until adding word elements into a fixed time window and the second one is up to sending the average word length records to the output topic.
1kubectl port-forward svc/flink-word-len-rest 8081
The Kafka I/O automatically creates a topic if it doesn’t exist, and we can see the input topic is created on kafka-ui.
Kafka Producer
A simple Python Kafka producer is created to check the output of the application. The producer app sends random text from the Faker package to the input Kafka topic every 1 second by default.
1# kafka/client/producer.py
2import os
3import time
4
5from faker import Faker
6from kafka import KafkaProducer
7
8
9class TextProducer:
10 def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
11 self.bootstrap_servers = bootstrap_servers
12 self.topic_name = topic_name
13 self.kafka_producer = self.create_producer()
14
15 def create_producer(self):
16 """
17 Returns a KafkaProducer instance
18 """
19 return KafkaProducer(
20 bootstrap_servers=self.bootstrap_servers,
21 value_serializer=lambda v: v.encode("utf-8"),
22 )
23
24 def send_to_kafka(self, text: str, timestamp_ms: int = None):
25 """
26 Sends text to a Kafka topic.
27 """
28 try:
29 args = {"topic": self.topic_name, "value": text}
30 if timestamp_ms is not None:
31 args = {**args, **{"timestamp_ms": timestamp_ms}}
32 self.kafka_producer.send(**args)
33 self.kafka_producer.flush()
34 except Exception as e:
35 raise RuntimeError("fails to send a message") from e
36
37
38if __name__ == "__main__":
39 producer = TextProducer(
40 os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"),
41 os.getenv("TOPIC_NAME", "input-topic"),
42 )
43 fake = Faker()
44
45 num_events = 0
46 while True:
47 num_events += 1
48 text = fake.text()
49 producer.send_to_kafka(text)
50 if num_events % 5 == 0:
51 print(f"<<<<<{num_events} text sent... current>>>>\n{text}")
52 time.sleep(int(os.getenv("DELAY_SECONDS", "1")))
The Kafka bootstrap server can be exposed on port 29092 using kubectl port-forward
and the producer app can be started by executing the Python script.
1kubectl port-forward svc/demo-cluster-kafka-external-bootstrap 29092
2
3python kafka/client/producer.py
We can see the output topic (output-topic-flink) is created on kafka-ui.
Also, we can check the output messages are created as expected in the Topics tab.
Delete Resources
The Kubernetes resources and minikube cluster can be deleted as shown below.
1## delete flink operator and related resoruces
2kubectl delete -f beam/word_len_cluster.yml
3kubectl delete -f beam/word_len_job.yml
4helm uninstall flink-kubernetes-operator
5helm repo remove flink-operator-repo
6kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
7
8## delete kafka cluster and related resources
9STRIMZI_VERSION="0.39.0"
10kubectl delete -f kafka/manifests/kafka-cluster.yaml
11kubectl delete -f kafka/manifests/kafka-ui.yaml
12kubectl delete -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
13
14## delete minikube
15minikube delete
Summary
In this series, we discuss how to deploy Python stream processing applications on Kubernetes, and we developed a PyFlink application in the previous post. In this post, we developed an Apache Beam pipeline using the Python SDK and deployed it on an Apache Flink cluster via the Apache Flink Runner. Same as the previous post, we deployed a Kafka cluster using the Strimzi Operator on a minikube cluster as the pipeline uses Apache Kafka topics for its data source and sink. Then, we developed the pipeline as a Python package and added the package to a custom Docker image so that Python user code can be executed externally. For deployment, we created a Flink session cluster via the Flink Kubernetes Operator, and deployed the pipeline using a Kubernetes job. Finally, we checked the output of the application by sending messages to the input Kafka topic using a Python producer application.
Comments