In this post, we develop an Apache Beam pipeline using the Python SDK and deploy it on an Apache Flink cluster via the Apache Flink Runner. Same as Part I, we deploy a Kafka cluster using the Strimzi Operator on a minikube cluster as the pipeline uses Apache Kafka topics for its data source and sink. Then, we develop the pipeline as a Python package and add the package to a custom Docker image so that Python user code can be executed externally. For deployment, we create a Flink session cluster via the Flink Kubernetes Operator, and deploy the pipeline using a Kubernetes job. Finally, we check the output of the application by sending messages to the input Kafka topic using a Python producer application.

We develop an Apache Beam pipeline using the Python SDK and deploy it on an Apache Flink cluster via the Apache Flink Runner. While the Flink cluster is created by the Flink Kubernetes Operator, we need two components (Job Service and SDK Harness) to run the pipeline on the Flink Runner. Roughly speaking, the former converts details about a Python pipeline into a format that the Flink runner can understand while the Python user code is executed by the latter - see this post for more details. Note that the Python SDK provides convenience wrappers to manage those components, and it can be utilised by specifying FlinkRunner in the pipeline option (i.e. --runner=FlinkRunner). We let the Job Service be managed automatically while relying on our own SDK Harness as a sidecar container for simplicity. Also, we need the Java IO Expansion Service as an extra because the pipeline uses Apache Kafka topics for its data source and sink, and the Kafka Connector I/O is developed in Java. Simply put, the expansion service is used to serialise data for the Java SDK.

Setup Kafka Cluster

Same as Part I, we deploy a Kafka cluster using the Strimzi Operator on a minikube cluster. Also, we create UI for Apache Kafka (kafka-ui) to facilitate development. See Part I for details about how to create them. The source of this post can be found in this GitHub repository.

When a Kafka cluster is created successfully, we can see the following resources.

 1kubectl get po,strimzipodsets.core.strimzi.io,svc -l app.kubernetes.io/instance=demo-cluster
 2NAME                           READY   STATUS    RESTARTS   AGE
 3pod/demo-cluster-kafka-0       1/1     Running   0          4m16s
 4pod/demo-cluster-zookeeper-0   1/1     Running   0          4m44s
 5
 6NAME                                                   PODS   READY PODS   CURRENT PODS   AGE
 7strimzipodset.core.strimzi.io/demo-cluster-kafka       1      1            1              4m17s
 8strimzipodset.core.strimzi.io/demo-cluster-zookeeper   1      1            1              4m45s
 9
10NAME                                            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
11service/demo-cluster-kafka-bootstrap            ClusterIP   10.100.17.151    <none>        9091/TCP,9092/TCP                     4m17s
12service/demo-cluster-kafka-brokers              ClusterIP   None             <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP   4m17s
13service/demo-cluster-kafka-external-0           NodePort    10.104.252.159   <none>        29092:30867/TCP                       4m17s
14service/demo-cluster-kafka-external-bootstrap   NodePort    10.98.229.176    <none>        29092:31488/TCP                       4m17s
15service/demo-cluster-zookeeper-client           ClusterIP   10.109.80.70     <none>        2181/TCP                              4m46s
16service/demo-cluster-zookeeper-nodes            ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            4m46s

The Kafka management app (kafka-ui) is deployed as a Kubernetes deployment and exposed by a single service.

 1kubectl get all -l app=kafka-ui
 2NAME                            READY   STATUS    RESTARTS   AGE
 3pod/kafka-ui-65dbbc98dc-xjnvh   1/1     Running   0          10m
 4
 5NAME               TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)    AGE
 6service/kafka-ui   ClusterIP   10.106.116.129   <none>        8080/TCP   10m
 7
 8NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
 9deployment.apps/kafka-ui   1/1     1            1           10m
10
11NAME                                  DESIRED   CURRENT   READY   AGE
12replicaset.apps/kafka-ui-65dbbc98dc   1         1         1       10m

We can use kubectl port-forward to connect to the kafka-ui server running in the minikube cluster on port 8080.

1kubectl port-forward svc/kafka-ui 8080

Develop Stream Processing App

We develop an Apache Beam pipeline as a Python package and add it to a custom Docker image, which is used to execute Python user code (SDK Harness). We also build another custom Docker image, which adds the Java SDK of Apache Beam to the official Flink base image. This image is used for deploying a Flink cluster as well as for executing Java user code of the Kafka Connector I/O.

Beam Pipeline Code

The application begins with reading text messages from an input Kafka topic, followed by extracting words by splitting the messages (ReadWordsFromKafka). Next, the elements (words) are added to a fixed time window of 5 seconds and their average length is calculated (CalculateAvgWordLen). Finally, we include the window start/end timestamps and send the updated element to an output Kafka topic (WriteWordLenToKafka).

Note that we create a custom Java IO Expansion Service (get_expansion_service) and add it to the ReadFromKafka and WriteToKafka transforms of the Kafka Connector I/O. Although the Kafka I/O provides a function to create that service, it did not work for me (or I do not understand how to make use of it yet). Instead, I ended up creating a custom service as illustrated in Building Big Data Pipelines with Apache Beam by Jan Lukavský. Note further that the expansion service Jar file (beam-sdks-java-io-expansion-service.jar) should exist in the Kubernetes job that executes the pipeline while the Java SDK (/opt/apache/beam/boot) should exist in the runner worker.

  1# beam/word_len/word_len.py
  2import json
  3import argparse
  4import re
  5import logging
  6import typing
  7
  8import apache_beam as beam
  9from apache_beam import pvalue
 10from apache_beam.io import kafka
 11from apache_beam.transforms.window import FixedWindows
 12from apache_beam.options.pipeline_options import PipelineOptions
 13from apache_beam.options.pipeline_options import SetupOptions
 14
 15from apache_beam.transforms.external import JavaJarExpansionService
 16
 17
 18def get_expansion_service(
 19    jar="/opt/apache/beam/jars/beam-sdks-java-io-expansion-service.jar", args=None
 20):
 21    if args == None:
 22        args = [
 23            "--defaultEnvironmentType=PROCESS",
 24            '--defaultEnvironmentConfig={"command": "/opt/apache/beam/boot"}',
 25            "--experiments=use_deprecated_read",
 26        ]
 27    return JavaJarExpansionService(jar, ["{{PORT}}"] + args)
 28
 29
 30class WordAccum(typing.NamedTuple):
 31    length: int
 32    count: int
 33
 34
 35beam.coders.registry.register_coder(WordAccum, beam.coders.RowCoder)
 36
 37
 38def decode_message(kafka_kv: tuple, verbose: bool = False):
 39    if verbose:
 40        print(kafka_kv)
 41    return kafka_kv[1].decode("utf-8")
 42
 43
 44def tokenize(element: str):
 45    return re.findall(r"[A-Za-z\']+", element)
 46
 47
 48def create_message(element: typing.Tuple[str, str, float]):
 49    msg = json.dumps(dict(zip(["window_start", "window_end", "avg_len"], element)))
 50    print(msg)
 51    return "".encode("utf-8"), msg.encode("utf-8")
 52
 53
 54class AverageFn(beam.CombineFn):
 55    def create_accumulator(self):
 56        return WordAccum(length=0, count=0)
 57
 58    def add_input(self, mutable_accumulator: WordAccum, element: str):
 59        length, count = tuple(mutable_accumulator)
 60        return WordAccum(length=length + len(element), count=count + 1)
 61
 62    def merge_accumulators(self, accumulators: typing.List[WordAccum]):
 63        lengths, counts = zip(*accumulators)
 64        return WordAccum(length=sum(lengths), count=sum(counts))
 65
 66    def extract_output(self, accumulator: WordAccum):
 67        length, count = tuple(accumulator)
 68        return length / count if count else float("NaN")
 69
 70    def get_accumulator_coder(self):
 71        return beam.coders.registry.get_coder(WordAccum)
 72
 73
 74class AddWindowTS(beam.DoFn):
 75    def process(self, avg_len: float, win_param=beam.DoFn.WindowParam):
 76        yield (
 77            win_param.start.to_rfc3339(),
 78            win_param.end.to_rfc3339(),
 79            avg_len,
 80        )
 81
 82
 83class ReadWordsFromKafka(beam.PTransform):
 84    def __init__(
 85        self,
 86        bootstrap_servers: str,
 87        topics: typing.List[str],
 88        group_id: str,
 89        verbose: bool = False,
 90        expansion_service: typing.Any = None,
 91        label: str | None = None,
 92    ) -> None:
 93        super().__init__(label)
 94        self.boostrap_servers = bootstrap_servers
 95        self.topics = topics
 96        self.group_id = group_id
 97        self.verbose = verbose
 98        self.expansion_service = expansion_service
 99
100    def expand(self, input: pvalue.PBegin):
101        return (
102            input
103            | "ReadFromKafka"
104            >> kafka.ReadFromKafka(
105                consumer_config={
106                    "bootstrap.servers": self.boostrap_servers,
107                    "auto.offset.reset": "latest",
108                    # "enable.auto.commit": "true",
109                    "group.id": self.group_id,
110                },
111                topics=self.topics,
112                timestamp_policy=kafka.ReadFromKafka.create_time_policy,
113                commit_offset_in_finalize=True,
114                expansion_service=self.expansion_service,
115            )
116            | "DecodeMessage" >> beam.Map(decode_message)
117            | "Tokenize" >> beam.FlatMap(tokenize)
118        )
119
120
121class CalculateAvgWordLen(beam.PTransform):
122    def expand(self, input: pvalue.PCollection):
123        return (
124            input
125            | "Windowing" >> beam.WindowInto(FixedWindows(size=5))
126            | "GetAvgWordLength" >> beam.CombineGlobally(AverageFn()).without_defaults()
127        )
128
129
130class WriteWordLenToKafka(beam.PTransform):
131    def __init__(
132        self,
133        bootstrap_servers: str,
134        topic: str,
135        expansion_service: typing.Any = None,
136        label: str | None = None,
137    ) -> None:
138        super().__init__(label)
139        self.boostrap_servers = bootstrap_servers
140        self.topic = topic
141        self.expansion_service = expansion_service
142
143    def expand(self, input: pvalue.PCollection):
144        return (
145            input
146            | "AddWindowTS" >> beam.ParDo(AddWindowTS())
147            | "CreateMessages"
148            >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
149            | "WriteToKafka"
150            >> kafka.WriteToKafka(
151                producer_config={"bootstrap.servers": self.boostrap_servers},
152                topic=self.topic,
153                expansion_service=self.expansion_service,
154            )
155        )
156
157
158def run(argv=None, save_main_session=True):
159    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
160    parser.add_argument(
161        "--deploy",
162        dest="deploy",
163        action="store_true",
164        default="Flag to indicate whether to deploy to a cluster",
165    )
166    parser.add_argument(
167        "--bootstrap_servers",
168        dest="bootstrap",
169        default="host.docker.internal:29092",
170        help="Kafka bootstrap server addresses",
171    )
172    parser.add_argument(
173        "--input_topic",
174        dest="input",
175        default="input-topic",
176        help="Kafka input topic name",
177    )
178    parser.add_argument(
179        "--output_topic",
180        dest="output",
181        default="output-topic-beam",
182        help="Kafka output topic name",
183    )
184    parser.add_argument(
185        "--group_id",
186        dest="group",
187        default="beam-word-len",
188        help="Kafka output group ID",
189    )
190
191    known_args, pipeline_args = parser.parse_known_args(argv)
192
193    print(known_args)
194    print(pipeline_args)
195
196    # We use the save_main_session option because one or more DoFn's in this
197    # workflow rely on global context (e.g., a module imported at module level).
198    pipeline_options = PipelineOptions(pipeline_args)
199    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
200
201    expansion_service = None
202    if known_args.deploy is True:
203        expansion_service = get_expansion_service()
204
205    with beam.Pipeline(options=pipeline_options) as p:
206        (
207            p
208            | "ReadWordsFromKafka"
209            >> ReadWordsFromKafka(
210                bootstrap_servers=known_args.bootstrap,
211                topics=[known_args.input],
212                group_id=known_args.group,
213                expansion_service=expansion_service,
214            )
215            | "CalculateAvgWordLen" >> CalculateAvgWordLen()
216            | "WriteWordLenToKafka"
217            >> WriteWordLenToKafka(
218                bootstrap_servers=known_args.bootstrap,
219                topic=known_args.output,
220                expansion_service=expansion_service,
221            )
222        )
223
224        logging.getLogger().setLevel(logging.DEBUG)
225        logging.info("Building pipeline ...")
226
227
228if __name__ == "__main__":
229    run()

The pipeline script is added to a Python package under a folder named word_len, and a simple module named run is created as it should be executed as a module (i.e. python -m ...). (I had an error if I execute the pipeline as a script.) Note that this way of packaging is for demonstration only and see this document for a recommended way of packaging a pipeline.

1# beam/word_len/run.py
2from . import *
3
4run()

Overall, the pipeline package is structured as shown below.

1tree beam/word_len
2
3beam/word_len
4├── __init__.py
5├── run.py
6└── word_len.py

Build Docker Images

As mentioned earlier, we build a custom Docker image (beam-python-example:1.16) that is used for deploying a Flink cluster as well as for executing Java user code of the Kafka Connector I/O.

1# beam/Dockerfile
2FROM flink:1.16
3
4COPY --from=apache/beam_java11_sdk:2.56.0 /opt/apache/beam/ /opt/apache/beam/

We also build another custom Docker image (beam-python-harness:2.56.0) to run Python user code (SDK Harness). From the Python SDK Docker image, it first installs JDK Development Kit (JDK) and downloads the Java IO Expansion Service Jar file. Then, Beam pipeline packages are copied to the /app folder, and it ends up adding the app folder into the PYTHONPATH environment variable, which makes the packages to be searchable.

 1# beam/Dockerfile-python-harness
 2FROM apache/beam_python3.10_sdk:2.56.0
 3
 4ARG BEAM_VERSION
 5ENV BEAM_VERSION=${BEAM_VERSION:-2.56.0}
 6ENV REPO_BASE_URL=https://repo1.maven.org/maven2/org/apache/beam
 7
 8RUN apt-get update && apt-get install -y default-jdk
 9
10RUN mkdir -p /opt/apache/beam/jars \
11  && wget ${REPO_BASE_URL}/beam-sdks-java-io-expansion-service/${BEAM_VERSION}/beam-sdks-java-io-expansion-service-${BEAM_VERSION}.jar \
12          --progress=bar:force:noscroll -O /opt/apache/beam/jars/beam-sdks-java-io-expansion-service.jar
13
14COPY word_len /app/word_len
15COPY word_count /app/word_count
16
17ENV PYTHONPATH="$PYTHONPATH:/app"

As the custom images should be accessible in the minikube cluster, we should point the terminal’s docker-cli to the minikube’s Docker engine. Then, the images can be built as usual using docker build.

1eval $(minikube docker-env)
2docker build -t beam-python-example:1.16 beam/
3docker build -t beam-python-harness:2.56.0 -f beam/Dockerfile-python-harness beam/

Deploy Stream Processing App

The Beam pipeline is executed on a Flink session cluster, which is deployed via the Flink Kubernetes Operator. Note that the application deployment mode where the Beam pipeline is deployed as a Flink job doesn’t seem to work (or I don’t understand how to do so yet) due to either job submission timeout error or failing to upload the job artifact. After the pipeline is deployed, we check the output of the application by sending text messages to the input Kafka topic.

We first need to install the certificate manager on the minikube cluster to enable adding the webhook component. Then, the operator can be installed using a Helm chart, and the version 1.8.0 is installed in the post.

 1kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
 2helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/
 3helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
 4# NAME: flink-kubernetes-operator
 5# LAST DEPLOYED: Mon Jun 03 21:37:45 2024
 6# NAMESPACE: default
 7# STATUS: deployed
 8# REVISION: 1
 9# TEST SUITE: None
10
11helm list
12# NAME                            NAMESPACE       REVISION        UPDATED                                         STATUS          CHART                           APP VERSION
13# flink-kubernetes-operator       default         1               2024-06-03 21:37:45.579302452 +1000 AEST        deployed        flink-kubernetes-operator-1.8.0 1.8.0

Deploy Beam Pipeline

First, we create a Flink session cluster. In the manifest file, we configure common properties such as the Docker image, Flink version, cluster configuration and pod template. These properties are applied to the Flink job manager and task manager, and only the replica and resource are specified additionally to them. Note that we add a sidecar container to the task manager and this SDK Harness container is configured to execute Python user code - see the job configuration below.

 1# beam/word_len_cluster.yml
 2apiVersion: flink.apache.org/v1beta1
 3kind: FlinkDeployment
 4metadata:
 5  name: word-len-cluster
 6spec:
 7  image: beam-python-example:1.16
 8  imagePullPolicy: Never
 9  flinkVersion: v1_16
10  flinkConfiguration:
11    taskmanager.numberOfTaskSlots: "10"
12  serviceAccount: flink
13  podTemplate:
14    spec:
15      containers:
16        - name: flink-main-container
17          volumeMounts:
18            - mountPath: /opt/flink/log
19              name: flink-logs
20      volumes:
21        - name: flink-logs
22          emptyDir: {}
23  jobManager:
24    resource:
25      memory: "2048Mi"
26      cpu: 2
27  taskManager:
28    replicas: 1
29    resource:
30      memory: "2048Mi"
31      cpu: 2
32    podTemplate:
33      spec:
34        containers:
35          - name: python-harness
36            image: beam-python-harness:2.56.0
37            args: ["-worker_pool"]
38            ports:
39              - containerPort: 50000
40                name: harness-port

The pipeline is deployed using a Kubernetes job, and the custom SDK Harness image is used to execute the pipeline as a module. The first two arguments are application specific arguments and the rest are arguments for pipeline options. The pipeline arguments are self-explanatory, or you can check available options in the pipeline options source and Flink Runner document. Note that, to execute Python user code in the sidecar container, we set the environment type to EXTERNAL and environment config to localhost:50000.

 1# beam/word_len_job.yml
 2apiVersion: batch/v1
 3kind: Job
 4metadata:
 5  name: word-len-job
 6spec:
 7  template:
 8    metadata:
 9      labels:
10        app: word-len-job
11    spec:
12      containers:
13        - name: beam-word-len-job
14          image: beam-python-harness:2.56.0
15          command: ["python"]
16          args:
17            - "-m"
18            - "word_len.run"
19            - "--deploy"
20            - "--bootstrap_servers=demo-cluster-kafka-bootstrap:9092"
21            - "--runner=FlinkRunner"
22            - "--flink_master=word-len-cluster-rest:8081"
23            - "--job_name=beam-word-len"
24            - "--streaming"
25            - "--parallelism=3"
26            - "--flink_submit_uber_jar"
27            - "--environment_type=EXTERNAL"
28            - "--environment_config=localhost:50000"
29            - "--checkpointing_interval=10000"
30      restartPolicy: Never

The session cluster and job can be deployed using kubectl create, and the former creates the FlinkDeployment custom resource, which manages the job manager deployment, task manager pod and associated services. When we check the log of the job’s pod, we see that it starts the Job Service after downloading the Jar file, uploads the pipeline artifact, submit the pipeline as a Flink job and continuously monitors the job status.

 1kubectl create -f beam/word_len_cluster.yml
 2# flinkdeployment.flink.apache.org/word-len-cluster created
 3kubectl create -f beam/word_len_job.yml
 4# job.batch/word-len-job created
 5
 6kubectl logs word-len-job-p5rph -f
 7# WARNING:apache_beam.options.pipeline_options:Unknown pipeline options received: --checkpointing_interval=10000. Ignore if flags are used for internal purposes.
 8# WARNING:apache_beam.options.pipeline_options:Unknown pipeline options received: --checkpointing_interval=10000. Ignore if flags are used for internal purposes.
 9# INFO:root:Building pipeline ...
10# INFO:apache_beam.runners.portability.flink_runner:Adding HTTP protocol scheme to flink_master parameter: http://word-len-cluster-rest:8081
11# WARNING:apache_beam.options.pipeline_options:Unknown pipeline options received: --checkpointing_interval=10000. Ignore if flags are used for internal purposes.
12# DEBUG:apache_beam.runners.portability.abstract_job_service:Got Prepare request.
13# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
14# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/config HTTP/1.1" 200 240
15# INFO:apache_beam.utils.subprocess_server:Downloading job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.16-job-server/2.56.0/beam-runners-flink-1.16-job-server-2.56.0.jar
16# INFO:apache_beam.runners.portability.abstract_job_service:Artifact server started on port 43287
17# DEBUG:apache_beam.runners.portability.abstract_job_service:Prepared job 'job' as 'job-edc1c2f1-80ef-48b7-af14-7e6fc86f338a'
18# INFO:apache_beam.runners.portability.abstract_job_service:Running job 'job-edc1c2f1-80ef-48b7-af14-7e6fc86f338a'
19# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
20# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "POST /v1/jars/upload HTTP/1.1" 200 148
21# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
22# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "POST /v1/jars/e1984c45-d8bc-4aa1-9b66-369a23826921_beam.jar/run HTTP/1.1" 200 44
23# INFO:apache_beam.runners.portability.flink_uber_jar_job_server:Started Flink job as a403cb2f92fecee65b8fd7cc8ac6e68a
24# INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
25# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
26# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
27# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
28# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
29# INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING
30# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
31# DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): word-len-cluster-rest:8081
32# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
33# DEBUG:urllib3.connectionpool:http://word-len-cluster-rest:8081 "GET /v1/jobs/a403cb2f92fecee65b8fd7cc8ac6e68a/execution-result HTTP/1.1" 200 31
34# ...

After deployment is complete, we can see the following Flink session cluster and job related resources.

 1kubectl get all -l app=word-len-cluster
 2# NAME                                    READY   STATUS    RESTARTS   AGE
 3# pod/word-len-cluster-7c98f6f868-d4hbx   1/1     Running   0          5m32s
 4# pod/word-len-cluster-taskmanager-1-1    2/2     Running   0          4m3s
 5
 6# NAME                            TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)             AGE
 7# service/word-len-cluster        ClusterIP   None           <none>        6123/TCP,6124/TCP   5m32s
 8# service/word-len-cluster-rest   ClusterIP   10.104.23.28   <none>        8081/TCP            5m32s
 9
10# NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
11# deployment.apps/word-len-cluster   1/1     1            1           5m32s
12
13# NAME                                          DESIRED   CURRENT   READY   AGE
14# replicaset.apps/word-len-cluster-7c98f6f868   1         1         1       5m32s
15
16kubectl get all -l app=word-len-job
17# NAME                     READY   STATUS    RESTARTS   AGE
18# pod/word-len-job-24r6q   1/1     Running   0          5m24s
19
20# NAME                     COMPLETIONS   DURATION   AGE
21# job.batch/word-len-job   0/1           5m24s      5m24s

The Flink web UI can be accessed using kubectl port-forward on port 8081. In the job graph, we see there are two tasks where the first task is performed until adding word elements into a fixed time window and the second one is up to sending the average word length records to the output topic.

1kubectl port-forward svc/flink-word-len-rest 8081

The Kafka I/O automatically creates a topic if it doesn’t exist, and we can see the input topic is created on kafka-ui.

Kafka Producer

A simple Python Kafka producer is created to check the output of the application. The producer app sends random text from the Faker package to the input Kafka topic every 1 second by default.

 1# kafka/client/producer.py
 2import os
 3import time
 4
 5from faker import Faker
 6from kafka import KafkaProducer
 7
 8
 9class TextProducer:
10    def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
11        self.bootstrap_servers = bootstrap_servers
12        self.topic_name = topic_name
13        self.kafka_producer = self.create_producer()
14
15    def create_producer(self):
16        """
17        Returns a KafkaProducer instance
18        """
19        return KafkaProducer(
20            bootstrap_servers=self.bootstrap_servers,
21            value_serializer=lambda v: v.encode("utf-8"),
22        )
23
24    def send_to_kafka(self, text: str, timestamp_ms: int = None):
25        """
26        Sends text to a Kafka topic.
27        """
28        try:
29            args = {"topic": self.topic_name, "value": text}
30            if timestamp_ms is not None:
31                args = {**args, **{"timestamp_ms": timestamp_ms}}
32            self.kafka_producer.send(**args)
33            self.kafka_producer.flush()
34        except Exception as e:
35            raise RuntimeError("fails to send a message") from e
36
37
38if __name__ == "__main__":
39    producer = TextProducer(
40        os.getenv("BOOTSTRAP_SERVERS", "localhost:29092"),
41        os.getenv("TOPIC_NAME", "input-topic"),
42    )
43    fake = Faker()
44
45    num_events = 0
46    while True:
47        num_events += 1
48        text = fake.text()
49        producer.send_to_kafka(text)
50        if num_events % 5 == 0:
51            print(f"<<<<<{num_events} text sent... current>>>>\n{text}")
52        time.sleep(int(os.getenv("DELAY_SECONDS", "1")))

The Kafka bootstrap server can be exposed on port 29092 using kubectl port-forward and the producer app can be started by executing the Python script.

1kubectl port-forward svc/demo-cluster-kafka-external-bootstrap 29092
2
3python kafka/client/producer.py

We can see the output topic (output-topic-flink) is created on kafka-ui.

Also, we can check the output messages are created as expected in the Topics tab.

Delete Resources

The Kubernetes resources and minikube cluster can be deleted as shown below.

 1## delete flink operator and related resoruces
 2kubectl delete -f beam/word_len_cluster.yml
 3kubectl delete -f beam/word_len_job.yml
 4helm uninstall flink-kubernetes-operator
 5helm repo remove flink-operator-repo
 6kubectl delete -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
 7
 8## delete kafka cluster and related resources
 9STRIMZI_VERSION="0.39.0"
10kubectl delete -f kafka/manifests/kafka-cluster.yaml
11kubectl delete -f kafka/manifests/kafka-ui.yaml
12kubectl delete -f kafka/manifests/strimzi-cluster-operator-$STRIMZI_VERSION.yaml
13
14## delete minikube
15minikube delete

Summary

In this series, we discuss how to deploy Python stream processing applications on Kubernetes, and we developed a PyFlink application in the previous post. In this post, we developed an Apache Beam pipeline using the Python SDK and deployed it on an Apache Flink cluster via the Apache Flink Runner. Same as the previous post, we deployed a Kafka cluster using the Strimzi Operator on a minikube cluster as the pipeline uses Apache Kafka topics for its data source and sink. Then, we developed the pipeline as a Python package and added the package to a custom Docker image so that Python user code can be executed externally. For deployment, we created a Flink session cluster via the Flink Kubernetes Operator, and deployed the pipeline using a Kubernetes job. Finally, we checked the output of the application by sending messages to the input Kafka topic using a Python producer application.