In the previous post, we developed an Apache Beam pipeline where the input data is augmented by an Remote Procedure Call (RPC) service. Each input element performs an RPC call and the output is enriched by the response. This is not an efficient way of accessing an external service provided that the service can accept more than one element. In this post, we discuss how to enhance the pipeline so that a single RPC call is made for a bundle of elements, which can save a significant amount time compared to making a call for each element.

Development Environment

The development environment has an Apache Flink cluster, Apache Kafka cluster and gRPC server. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.

Manage Environment

The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.

  • ./setup/start-flink-env.sh
  • ./setup/stop-flink-env.sh

Those scripts accept four flags: -f, -k and -g to start/stop individual resources or -a to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh.

Below shows how to start resources using the start-up script. We need to launch both the Flink/Kafka clusters and gRPC server if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster and gRPC server only.

 1## start a local flink can kafka cluster
 2./setup/start-flink-env.sh -f -k -g
 3# [+] Running 6/6
 4#  ⠿ Network app-network      Created                                                        0.0s
 5#  ⠿ Volume "kafka_0_data"    Created                                                        0.0s
 6#  ⠿ Volume "zookeeper_data"  Created                                                        0.0s
 7#  ⠿ Container zookeeper      Started                                                        0.5s
 8#  ⠿ Container kafka-0        Started                                                        0.7s
 9#  ⠿ Container kafka-ui       Started                                                        0.9s
10# [+] Running 2/2
11#  ⠿ Network grpc-network   Created                                                          0.0s
12#  ⠿ Container grpc-server  Started                                                          0.4s
13# start flink 1.18.1...
14# Starting cluster.
15# Starting standalonesession daemon on host <hostname>.
16# Starting taskexecutor daemon on host <hostname>.
17
18## start a local kafka cluster only
19./setup/start-flink-env.sh -k -g
20# [+] Running 6/6
21#  ⠿ Network app-network      Created                                                        0.0s
22#  ⠿ Volume "kafka_0_data"    Created                                                        0.0s
23#  ⠿ Volume "zookeeper_data"  Created                                                        0.0s
24#  ⠿ Container zookeeper      Started                                                        0.5s
25#  ⠿ Container kafka-0        Started                                                        0.7s
26#  ⠿ Container kafka-ui       Started                                                        0.9s
27# [+] Running 2/2
28#  ⠿ Network grpc-network   Created                                                          0.0s
29#  ⠿ Container grpc-server  Started                                                          0.4s

Remote Procedure Call (RPC) Service

The RPC service have two methods - resolve and resolveBatch. The former accepts a request with a string and returns an integer while the latter accepts a list of string requests and returns a list of integer responses. See Part 4 for details about how the RPC service is developed.

Overall, we have the following files for the gRPC server and client applications, and the server.py gets started when we execute the start-up script with the -g flag.

 1tree -P "serv*|proto" -I "*pycache*"
 2.
 3├── proto
 4│   └── service.proto
 5├── server.py
 6├── server_client.py
 7├── service_pb2.py
 8└── service_pb2_grpc.py
 9
101 directory, 5 files

We can check the client and server applications as Python scripts. If we select 1, the next prompt requires to enter a word. Upon entering a word, it returns a tuple of the word and its length as an output. We can make an RPC request with a text if we select 2. Similar to the earlier call, it returns enriched outputs as multiple tuples.

Beam Pipeline

We develop an Apache Beam pipeline that accesses an external RPC service to augment input elements. In this version, it is configured so that a single RPC call is made for a bundle of elements.

Shared Source

We have multiple pipelines that read text messages from an input Kafka topic and write outputs to an output topic. Therefore, the data source and sink transforms are refactored into a utility module as shown below. Note that, the Kafka read and write methods has an argument called deprecated_read, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.

 1# chapter3/io_utils.py
 2import re
 3import typing
 4
 5import apache_beam as beam
 6from apache_beam import pvalue
 7from apache_beam.io import kafka
 8
 9
10def decode_message(kafka_kv: tuple):
11    print(kafka_kv)
12    return kafka_kv[1].decode("utf-8")
13
14
15def tokenize(element: str):
16    return re.findall(r"[A-Za-z\']+", element)
17
18
19class ReadWordsFromKafka(beam.PTransform):
20    def __init__(
21        self,
22        bootstrap_servers: str,
23        topics: typing.List[str],
24        group_id: str,
25        deprecated_read: bool,
26        verbose: bool = False,
27        label: str | None = None,
28    ) -> None:
29        super().__init__(label)
30        self.boostrap_servers = bootstrap_servers
31        self.topics = topics
32        self.group_id = group_id
33        self.verbose = verbose
34        self.expansion_service = None
35        if deprecated_read:
36            self.expansion_service = kafka.default_io_expansion_service(
37                ["--experiments=use_deprecated_read"]
38            )
39
40    def expand(self, input: pvalue.PBegin):
41        return (
42            input
43            | "ReadFromKafka"
44            >> kafka.ReadFromKafka(
45                consumer_config={
46                    "bootstrap.servers": self.boostrap_servers,
47                    "auto.offset.reset": "latest",
48                    # "enable.auto.commit": "true",
49                    "group.id": self.group_id,
50                },
51                topics=self.topics,
52                timestamp_policy=kafka.ReadFromKafka.create_time_policy,
53                commit_offset_in_finalize=True,
54                expansion_service=self.expansion_service,
55            )
56            | "DecodeMessage" >> beam.Map(decode_message)
57            | "ExtractWords" >> beam.FlatMap(tokenize)
58        )
59
60
61class WriteOutputsToKafka(beam.PTransform):
62    def __init__(
63        self,
64        bootstrap_servers: str,
65        topic: str,
66        deprecated_read: bool,
67        label: str | None = None,
68    ) -> None:
69        super().__init__(label)
70        self.boostrap_servers = bootstrap_servers
71        self.topic = topic
72        self.expansion_service = None
73        if deprecated_read:
74            self.expansion_service = kafka.default_io_expansion_service(
75                ["--experiments=use_deprecated_read"]
76            )
77
78    def expand(self, pcoll: pvalue.PCollection):
79        return pcoll | "WriteToKafka" >> kafka.WriteToKafka(
80            producer_config={"bootstrap.servers": self.boostrap_servers},
81            topic=self.topic,
82            expansion_service=self.expansion_service,
83        )

Pipeline Source

According to the life cycle of a DoFn object, the BatchRpcDoFn is configured as follows.

  • setUp - The RPC service is established because this method is commonly used to initialize transient resources that are needed during the processing.
  • start_bundle - In a DoFn object, input elements are split into chunks called bundles, and this method is invoked when a new bundle arrives. We initialise an empty list to keep input elements in a bundle.
  • process - Each input element in a bundle is appended into the elements list after being converted into a WindowedValue object.
  • finish_bundle - A single RPC call is made to the resolveBatch method after unique input elements are converted into a RequestList object. It has two advantages. First, as requests with the same input tend to return the same output, making a call with unique elements can reduce request time. Secondly, calling to the resolveBatch method with multiple elements can save a significant amount of time compared to making a call for each element. Once a response is made, output elements are constructed by augmenting input elements with the response, and the output elements are returned as a list.
  • teardown - The connection (channel) to the RPC service is closed.
  1# chapter3/rpc_pardo_batch.py
  2import os
  3import argparse
  4import json
  5import re
  6import typing
  7import logging
  8
  9import apache_beam as beam
 10from apache_beam.utils.windowed_value import WindowedValue
 11from apache_beam.options.pipeline_options import PipelineOptions
 12from apache_beam.options.pipeline_options import SetupOptions
 13
 14from io_utils import ReadWordsFromKafka, WriteOutputsToKafka
 15
 16
 17def create_message(element: typing.Tuple[str, int]):
 18    msg = json.dumps({"word": element[0], "length": element[1]})
 19    print(msg)
 20    return element[0].encode("utf-8"), msg.encode("utf-8")
 21
 22
 23class BatchRpcDoFn(beam.DoFn):
 24    channel = None
 25    stub = None
 26    elements: typing.List[WindowedValue] = None
 27    hostname = "localhost"
 28    port = "50051"
 29
 30    def setup(self):
 31        import grpc
 32        import service_pb2_grpc
 33
 34        self.channel: grpc.Channel = grpc.insecure_channel(
 35            f"{self.hostname}:{self.port}"
 36        )
 37        self.stub = service_pb2_grpc.RpcServiceStub(self.channel)
 38
 39    def teardown(self):
 40        if self.channel is not None:
 41            self.channel.close()
 42
 43    def start_bundle(self):
 44        self.elements = []
 45
 46    def finish_bundle(self):
 47        import service_pb2
 48
 49        unqiue_values = set([e.value for e in self.elements])
 50        request_list = service_pb2.RequestList()
 51        request_list.request.extend(
 52            [service_pb2.Request(input=e) for e in unqiue_values]
 53        )
 54        response = self.stub.resolveBatch(request_list)
 55        resolved = dict(zip(unqiue_values, [r.output for r in response.response]))
 56
 57        return [
 58            WindowedValue(
 59                value=(e.value, resolved[e.value]),
 60                timestamp=e.timestamp,
 61                windows=e.windows,
 62            )
 63            for e in self.elements
 64        ]
 65
 66    def process(
 67        self,
 68        element: str,
 69        timestamp=beam.DoFn.TimestampParam,
 70        win_param=beam.DoFn.WindowParam,
 71    ):
 72        self.elements.append(
 73            WindowedValue(value=element, timestamp=timestamp, windows=(win_param,))
 74        )
 75
 76
 77def run(argv=None, save_main_session=True):
 78    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 79    parser.add_argument(
 80        "--bootstrap_servers",
 81        default="host.docker.internal:29092",
 82        help="Kafka bootstrap server addresses",
 83    )
 84    parser.add_argument("--input_topic", default="input-topic", help="Input topic")
 85    parser.add_argument(
 86        "--output_topic",
 87        default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
 88        help="Output topic",
 89    )
 90    parser.add_argument(
 91        "--deprecated_read",
 92        action="store_true",
 93        default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
 94    )
 95    parser.set_defaults(deprecated_read=False)
 96
 97    known_args, pipeline_args = parser.parse_known_args(argv)
 98
 99    # # We use the save_main_session option because one or more DoFn's in this
100    # # workflow rely on global context (e.g., a module imported at module level).
101    pipeline_options = PipelineOptions(pipeline_args)
102    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
103    print(f"known args - {known_args}")
104    print(f"pipeline options - {pipeline_options.display_data()}")
105
106    with beam.Pipeline(options=pipeline_options) as p:
107        (
108            p
109            | "ReadInputsFromKafka"
110            >> ReadWordsFromKafka(
111                bootstrap_servers=known_args.bootstrap_servers,
112                topics=[known_args.input_topic],
113                group_id=f"{known_args.output_topic}-group",
114                deprecated_read=known_args.deprecated_read,
115            )
116            | "RequestRPC" >> beam.ParDo(BatchRpcDoFn())
117            | "CreateMessags"
118            >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
119            | "WriteOutputsToKafka"
120            >> WriteOutputsToKafka(
121                bootstrap_servers=known_args.bootstrap_servers,
122                topic=known_args.output_topic,
123                deprecated_read=known_args.deprecated_read,
124            )
125        )
126
127        logging.getLogger().setLevel(logging.WARN)
128        logging.info("Building pipeline ...")
129
130
131if __name__ == "__main__":
132    run()

Pipeline Test

As described in this documentation, we can test a Beam pipeline as following.

  1. Create a TestPipeline.
  2. Create some static, known test input data.
  3. Create a PCollection of input data using the Create transform (if bounded source) or a TestStream (if unbounded source)
  4. Apply the transform to the input PCollection and save the resulting output PCollection.
  5. Use PAssert and its subclasses (or testing utils in Python) to verify that the output PCollection contains the elements that you expect.

We use a text file that keeps a random text (input/lorem.txt) for testing. Then, we add the lines into a test stream and apply the main transform. Finally, we compare the actual output with an expected output. The expected output is a list of tuples where each element is a word and its length.

 1# chapter3/rpc_pardo_batch_test.py
 2import os
 3import unittest
 4from concurrent import futures
 5
 6import apache_beam as beam
 7from apache_beam.coders import coders
 8from apache_beam.testing.test_pipeline import TestPipeline
 9from apache_beam.testing.util import assert_that, equal_to
10from apache_beam.testing.test_stream import TestStream
11from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
12
13import grpc
14import service_pb2_grpc
15import server
16
17from rpc_pardo_batch import BatchRpcDoFn
18from io_utils import tokenize
19
20
21def read_file(filename: str, inputpath: str):
22    with open(os.path.join(inputpath, filename), "r") as f:
23        return f.readlines()
24
25
26def compute_expected_output(lines: list):
27    output = []
28    for line in lines:
29        words = [(w, len(w)) for w in tokenize(line)]
30        output = output + words
31    return output
32
33
34class RpcParDooBatchTest(unittest.TestCase):
35    server_class = server.RpcServiceServicer
36    port = 50051
37
38    def setUp(self):
39        self.server = grpc.server(futures.ThreadPoolExecutor())
40        service_pb2_grpc.add_RpcServiceServicer_to_server(
41            self.server_class(), self.server
42        )
43        self.server.add_insecure_port(f"[::]:{self.port}")
44        self.server.start()
45
46    def tearDown(self):
47        self.server.stop(None)
48
49    def test_pipeline(self):
50        options = PipelineOptions()
51        options.view_as(StandardOptions).streaming = True
52        with TestPipeline(options=options) as p:
53            PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
54            lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
55            test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
56            for line in lines:
57                test_stream.add_elements([line])
58            test_stream.advance_watermark_to_infinity()
59
60            output = (
61                p
62                | test_stream
63                | "ExtractWords" >> beam.FlatMap(tokenize)
64                | "RequestRPC" >> beam.ParDo(BatchRpcDoFn())
65            )
66
67            EXPECTED_OUTPUT = compute_expected_output(lines)
68
69            assert_that(output, equal_to(EXPECTED_OUTPUT))
70
71
72if __name__ == "__main__":
73    unittest.main()

We can execute the pipeline test as shown below.

1python chapter3/rpc_pardo_batch_test.py 
2.
3----------------------------------------------------------------------
4Ran 1 test in 0.285s
5
6OK

Pipeline Execution

Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by a Kafka producer app that is discussed later and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.

1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1       host.docker.internal

We need to send messages into the input Kafka topic before executing the pipeline. Input text message can be sent by executing a Kafka text producer - python utils/faker_gen.py.

When executing the pipeline, we specify only a single known argument that enables to use the legacy read (--deprecated_read) while accepting default values of the other known arguments (bootstrap_servers, input_topic …). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081). Alternatively, we can use an embedded Flink cluster if we exclude that argument.

1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter3/rpc_pardo_batch.py --deprecated_read \
4    --job_name=rpc-pardo-batch --runner FlinkRunner --flink_master=localhost:8081 \
5  --streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000

On Flink UI, we see the pipeline only has a single task.

On Kafka UI, we can check the output message is a dictionary of a word and its length.