In this post, we develop an Apache Beam pipeline where the input data is augmented by an Remote Procedure Call (RPC). Each of the input elements performs an RPC call and the output is enriched by the response. This is not an efficient way of accessing an external service provided that the service can accept more than one element. In the subsequent two posts, we will discuss updated pipelines that make RPC calls more efficiently. We begin with illustrating how to manage development resources followed by demonstrating the RPC service that we use in this series. Finally, we develop a Beam pipeline that accesses the external service to augment the input elements.

Development Environment

The development environment has an Apache Flink cluster, Apache Kafka cluster and gRPC server. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.

Manage Environment

The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.

  • ./setup/start-flink-env.sh
  • ./setup/stop-flink-env.sh

Those scripts accept four flags: -f, -k and -g to start/stop individual resources or -a to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh.

Below shows how to start resources using the start-up script. We need to launch both the Flink and Kafka clusters if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster only.

 1## start a local flink can kafka cluster
 2./setup/start-flink-env.sh -f -k -g
 3# [+] Running 6/6
 4#  ⠿ Network app-network      Created                                                        0.0s
 5#  ⠿ Volume "kafka_0_data"    Created                                                        0.0s
 6#  ⠿ Volume "zookeeper_data"  Created                                                        0.0s
 7#  ⠿ Container zookeeper      Started                                                        0.5s
 8#  ⠿ Container kafka-0        Started                                                        0.7s
 9#  ⠿ Container kafka-ui       Started                                                        0.9s
10# [+] Running 2/2
11#  ⠿ Network grpc-network   Created                                                          0.0s
12#  ⠿ Container grpc-server  Started                                                          0.4s
13# start flink 1.18.1...
14# Starting cluster.
15# Starting standalonesession daemon on host <hostname>.
16# Starting taskexecutor daemon on host <hostname>.
17
18## start a local kafka cluster only
19./setup/start-flink-env.sh -k -g
20# [+] Running 6/6
21#  ⠿ Network app-network      Created                                                        0.0s
22#  ⠿ Volume "kafka_0_data"    Created                                                        0.0s
23#  ⠿ Volume "zookeeper_data"  Created                                                        0.0s
24#  ⠿ Container zookeeper      Started                                                        0.5s
25#  ⠿ Container kafka-0        Started                                                        0.7s
26#  ⠿ Container kafka-ui       Started                                                        0.9s
27# [+] Running 2/2
28#  ⠿ Network grpc-network   Created                                                          0.0s
29#  ⠿ Container grpc-server  Started                                                          0.4s

Introduction to RPC Service

Create client and server interfaces

A service is defined in the .proto file, and it supports two methods - resolve and resolveBatch. The former accepts a request with a string and returns an integer while the latter accepts a list of the string requests and returns a list of the integer responses.

 1// chapter3/proto/service.proto
 2syntax = "proto3";
 3
 4package chapter3;
 5
 6message Request {
 7  string input = 1;
 8}
 9
10message Response {
11  int32 output = 1;
12}
13
14message RequestList {
15  repeated Request request = 1;
16}
17
18message ResponseList {
19  repeated Response response = 1;
20}
21
22service RpcService {
23  rpc resolve(Request) returns (Response);
24  rpc resolveBatch(RequestList) returns (ResponseList);
25}

We can generate the gRPC client and server interfaces from the .proto service definition using the grpcio-tools package as shown below.

1cd chapter3
2mkdir proto && touch proto/service.proto
3
4## << copy the proto service definition >>
5
6## generate grpc client and server interfaces
7python -m grpc_tools.protoc -I proto --python_out=. --grpc_python_out=. proto/service.proto

Running the above command generates service_pb2.py and service_pb2_grpc.py, and they contain:

  • classes for the messages defined in service.proto
  • classes for the service defined in service.proto
    • RpcServiceStub, which can be used by clients to invoke RpcService RPCs
    • RpcServiceServicer, which defines the interface for implementations of the RpcService service
  • a function for the service defined in service.proto
    • add_RpcServiceServicer_to_server, which adds a RpcServiceServicer to a grpc.Server

Create client and server

Using the gRPC interfaces generated earlier, we can create server and client applications. The server implements the two RPC methods (resolve and resolveBatch) where the response output is the length of the request input string. This server application is accessed by the Beam pipline, and it gets started when we start the development resources while including the -g flag.

 1# chapter3/server.py
 2import os
 3import argparse
 4from concurrent import futures
 5
 6import grpc
 7import service_pb2
 8import service_pb2_grpc
 9
10
11class RpcServiceServicer(service_pb2_grpc.RpcServiceServicer):
12    def resolve(self, request, context):
13        if os.getenv("VERBOSE", "False") == "True":
14            print(f"resolve Request Made: input - {request.input}")
15        response = service_pb2.Response(output=len(request.input))
16        return response
17
18    def resolveBatch(self, request, context):
19        if os.getenv("VERBOSE", "False") == "True":
20            print("resolveBatch Request Made:")
21            print(f"\tInputs - {', '.join([r.input for r in request.request])}")
22        response = service_pb2.ResponseList()
23        response.response.extend(
24            [service_pb2.Response(output=len(r.input)) for r in request.request]
25        )
26        return response
27
28
29def serve():
30    server = grpc.server(futures.ThreadPoolExecutor())
31    service_pb2_grpc.add_RpcServiceServicer_to_server(RpcServiceServicer(), server)
32    server.add_insecure_port(os.getenv("INSECURE_PORT", "0.0.0.0:50051"))
33    server.start()
34    server.wait_for_termination()
35
36
37if __name__ == "__main__":
38    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
39    parser.add_argument(
40        "--verbose",
41        action="store_true",
42        default="Whether to print messages for debugging.",
43    )
44    parser.set_defaults(verbose=False)
45    opts = parser.parse_args()
46    os.environ["VERBOSE"] = str(opts.verbose)
47    serve()

The client application is created for demonstration, and we use the same logic to access the server application within a Beam pipeline. It requires a user input (1 or 2) to determine which method to call, and a user is expected to write an element (word or text) so that the client can make a request. See below for details about how the client and server applications work.

 1# chapter3/server_client.py
 2import time
 3
 4import grpc
 5import service_pb2
 6import service_pb2_grpc
 7
 8
 9def get_client_stream_requests():
10    while True:
11        name = input("Please enter a name (or nothing to stop chatting):")
12        if name == "":
13            break
14        hello_request = service_pb2.HelloRequest(greeting="Hello", name=name)
15        yield hello_request
16        time.sleep(1)
17
18
19def run():
20    with grpc.insecure_channel("localhost:50051") as channel:
21        stub = service_pb2_grpc.RpcServiceStub(channel)
22        print("1. Resolve - Unary")
23        print("2. ResolveBatch - Unary")
24        rpc_call = input("Which rpc would you like to make: ")
25        if rpc_call == "1":
26            element = input("Please enter a word: ")
27            if not element:
28                element = "Hello"
29            request = service_pb2.Request(input=element)
30            resolved = stub.resolve(request)
31            print("Resolve response received: ")
32            print(f"({element}, {resolved.output})")
33        if rpc_call == "2":
34            element = input("Please enter a text: ")
35            if not element:
36                element = "Beautiful is better than ugly"
37            words = element.split(" ")
38            request_list = service_pb2.RequestList()
39            request_list.request.extend([service_pb2.Request(input=e) for e in words])
40            response = stub.resolveBatch(request_list)
41            resolved = [r.output for r in response.response]
42            print("ResolveBatch response received: ")
43            print(", ".join([f"({t[0]}, {t[1]})" for t in zip(words, resolved)]))
44
45
46if __name__ == "__main__":
47    run()

Overall, we end up having the following files for the gRPC server and client applications.

 1tree -P "serv*|proto" -I "*pycache*"
 2.
 3├── proto
 4│   └── service.proto
 5├── server.py
 6├── server_client.py
 7├── service_pb2.py
 8└── service_pb2_grpc.py
 9
101 directory, 5 files

We can start the client and server applications as Python scripts. If we select 1, the next prompt indicates to enter a word. If a word is entered, it returns an output, which is a tuple of the word and its length. We can make an RPC request with a text if we select 2. Similar to the earlier call, it returns enriched outputs as multiple tuples.

Beam Pipelines

We develop an Apache Beam pipeline that accesses an external RPC service to augment the input elements.

Shared Source

We have multiple pipelines that read text messages from an input Kafka topic and write outputs to an output topic. Therefore, the data source and sink transforms are refactored into a utility module as shown below. Note that, the Kafka read and write methods has an argument called deprecated_read, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.

 1# chapter3/io_utils.py
 2import re
 3import typing
 4
 5import apache_beam as beam
 6from apache_beam import pvalue
 7from apache_beam.io import kafka
 8
 9
10def decode_message(kafka_kv: tuple):
11    print(kafka_kv)
12    return kafka_kv[1].decode("utf-8")
13
14
15def tokenize(element: str):
16    return re.findall(r"[A-Za-z\']+", element)
17
18
19class ReadWordsFromKafka(beam.PTransform):
20    def __init__(
21        self,
22        bootstrap_servers: str,
23        topics: typing.List[str],
24        group_id: str,
25        deprecated_read: bool,
26        verbose: bool = False,
27        label: str | None = None,
28    ) -> None:
29        super().__init__(label)
30        self.boostrap_servers = bootstrap_servers
31        self.topics = topics
32        self.group_id = group_id
33        self.verbose = verbose
34        self.expansion_service = None
35        if deprecated_read:
36            self.expansion_service = kafka.default_io_expansion_service(
37                ["--experiments=use_deprecated_read"]
38            )
39
40    def expand(self, input: pvalue.PBegin):
41        return (
42            input
43            | "ReadFromKafka"
44            >> kafka.ReadFromKafka(
45                consumer_config={
46                    "bootstrap.servers": self.boostrap_servers,
47                    "auto.offset.reset": "latest",
48                    # "enable.auto.commit": "true",
49                    "group.id": self.group_id,
50                },
51                topics=self.topics,
52                timestamp_policy=kafka.ReadFromKafka.create_time_policy,
53                commit_offset_in_finalize=True,
54                expansion_service=self.expansion_service,
55            )
56            | "DecodeMessage" >> beam.Map(decode_message)
57            | "ExtractWords" >> beam.FlatMap(tokenize)
58        )
59
60
61class WriteOutputsToKafka(beam.PTransform):
62    def __init__(
63        self,
64        bootstrap_servers: str,
65        topic: str,
66        deprecated_read: bool,
67        label: str | None = None,
68    ) -> None:
69        super().__init__(label)
70        self.boostrap_servers = bootstrap_servers
71        self.topic = topic
72        self.expansion_service = None
73        if deprecated_read:
74            self.expansion_service = kafka.default_io_expansion_service(
75                ["--experiments=use_deprecated_read"]
76            )
77
78    def expand(self, pcoll: pvalue.PCollection):
79        return pcoll | "WriteToKafka" >> kafka.WriteToKafka(
80            producer_config={"bootstrap.servers": self.boostrap_servers},
81            topic=self.topic,
82            expansion_service=self.expansion_service,
83        )

Beam Pipeline

In RpcDoFn, connection to the RPC service is established in the setUp method, and the input element is augmented by a response from the service in the process method. It returns a tuple of the element and response output, which is the length of the element. Finally, the connection (channel) is closed in the teardown method.

  1# chapter3/rpc_pardo.py
  2import os
  3import argparse
  4import json
  5import re
  6import typing
  7import logging
  8
  9import apache_beam as beam
 10from apache_beam.options.pipeline_options import PipelineOptions
 11from apache_beam.options.pipeline_options import SetupOptions
 12
 13from io_utils import ReadWordsFromKafka, WriteOutputsToKafka
 14
 15
 16def create_message(element: typing.Tuple[str, int]):
 17    msg = json.dumps({"word": element[0], "length": element[1]})
 18    print(msg)
 19    return element[0].encode("utf-8"), msg.encode("utf-8")
 20
 21
 22class RpcDoFn(beam.DoFn):
 23    channel = None
 24    stub = None
 25    hostname = "localhost"
 26    port = "50051"
 27
 28    def setup(self):
 29        import grpc
 30        import service_pb2_grpc
 31
 32        self.channel: grpc.Channel = grpc.insecure_channel(
 33            f"{self.hostname}:{self.port}"
 34        )
 35        self.stub = service_pb2_grpc.RpcServiceStub(self.channel)
 36
 37    def teardown(self):
 38        if self.channel is not None:
 39            self.channel.close()
 40
 41    def process(self, element: str) -> typing.Iterator[typing.Tuple[str, int]]:
 42        import service_pb2
 43
 44        request = service_pb2.Request(input=element)
 45        response = self.stub.resolve(request)
 46        yield element, response.output
 47
 48
 49def run(argv=None, save_main_session=True):
 50    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 51    parser.add_argument(
 52        "--bootstrap_servers",
 53        default="host.docker.internal:29092",
 54        help="Kafka bootstrap server addresses",
 55    )
 56    parser.add_argument("--input_topic", default="input-topic", help="Input topic")
 57    parser.add_argument(
 58        "--output_topic",
 59        default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
 60        help="Output topic",
 61    )
 62    parser.add_argument(
 63        "--deprecated_read",
 64        action="store_true",
 65        default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
 66    )
 67    parser.set_defaults(deprecated_read=False)
 68
 69    known_args, pipeline_args = parser.parse_known_args(argv)
 70
 71    # # We use the save_main_session option because one or more DoFn's in this
 72    # # workflow rely on global context (e.g., a module imported at module level).
 73    pipeline_options = PipelineOptions(pipeline_args)
 74    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
 75    print(f"known args - {known_args}")
 76    print(f"pipeline options - {pipeline_options.display_data()}")
 77
 78    with beam.Pipeline(options=pipeline_options) as p:
 79        (
 80            p
 81            | "ReadInputsFromKafka"
 82            >> ReadWordsFromKafka(
 83                bootstrap_servers=known_args.bootstrap_servers,
 84                topics=[known_args.input_topic],
 85                group_id=f"{known_args.output_topic}-group",
 86                deprecated_read=known_args.deprecated_read,
 87            )
 88            | "RequestRPC" >> beam.ParDo(RpcDoFn())
 89            | "CreateMessags"
 90            >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
 91            | "WriteOutputsToKafka"
 92            >> WriteOutputsToKafka(
 93                bootstrap_servers=known_args.bootstrap_servers,
 94                topic=known_args.output_topic,
 95                deprecated_read=known_args.deprecated_read,
 96            )
 97        )
 98
 99        logging.getLogger().setLevel(logging.WARN)
100        logging.info("Building pipeline ...")
101
102
103if __name__ == "__main__":
104    run()

Pipeline Test

As described in this documentation, we can test a Beam pipeline as following.

  1. Create a TestPipeline.
  2. Create some static, known test input data.
  3. Create a PCollection of input data using the Create transform (if bounded source) or a TestStream (if unbounded source)
  4. Apply the transform to the input PCollection and save the resulting output PCollection.
  5. Use PAssert and its subclasses (or testing utils in Python) to verify that the output PCollection contains the elements that you expect.

We use a text file that keeps a random text (input/lorem.txt) for testing. Then, we add the lines into a test stream and apply the main transform. Finally, we compare the actual output with an expected output. The expected output is a list of tuples where each element is a word and its length.

 1# chapter3/rpc_pardo_test.py
 2import os
 3import unittest
 4from concurrent import futures
 5
 6import apache_beam as beam
 7from apache_beam.coders import coders
 8from apache_beam.testing.test_pipeline import TestPipeline
 9from apache_beam.testing.util import assert_that, equal_to
10from apache_beam.testing.test_stream import TestStream
11from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
12
13import grpc
14import service_pb2_grpc
15import server
16
17from rpc_pardo import RpcDoFn
18from io_utils import tokenize
19
20
21def read_file(filename: str, inputpath: str):
22    with open(os.path.join(inputpath, filename), "r") as f:
23        return f.readlines()
24
25
26def compute_expected_output(lines: list):
27    output = []
28    for line in lines:
29        words = [(w, len(w)) for w in tokenize(line)]
30        output = output + words
31    return output
32
33
34class RpcParDooTest(unittest.TestCase):
35    server_class = server.RpcServiceServicer
36    port = 50051
37
38    def setUp(self):
39        self.server = grpc.server(futures.ThreadPoolExecutor())
40        service_pb2_grpc.add_RpcServiceServicer_to_server(
41            self.server_class(), self.server
42        )
43        self.server.add_insecure_port(f"[::]:{self.port}")
44        self.server.start()
45
46    def tearDown(self):
47        self.server.stop(None)
48
49    def test_pipeline(self):
50        options = PipelineOptions()
51        options.view_as(StandardOptions).streaming = True
52        with TestPipeline(options=options) as p:
53            PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
54            lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
55            test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
56            for line in lines:
57                test_stream.add_elements([line])
58            test_stream.advance_watermark_to_infinity()
59
60            output = (
61                p
62                | test_stream
63                | "ExtractWords" >> beam.FlatMap(tokenize)
64                | "RequestRPC" >> beam.ParDo(RpcDoFn())
65            )
66
67            EXPECTED_OUTPUT = compute_expected_output(lines)
68
69            assert_that(output, equal_to(EXPECTED_OUTPUT))
70
71
72if __name__ == "__main__":
73    unittest.main()

We can execute the pipeline test as shown below.

1python chapter3/rpc_pardo_test.py 
2.
3----------------------------------------------------------------------
4Ran 1 test in 0.373s
5
6OK

Pipeline Execution

We need to send messages into the input Kafka topic before executing the pipeline. Input text message can be sent by executing a Kafka text producer - python utils/faker_gen.py.

When executing the pipeline, we specify only a single known argument that enables to use the legacy read (--deprecated_read) while accepting default values of the other known arguments (bootstrap_servers, input_topic …). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081). Alternatively, we can use an embedded Flink cluster if we exclude that argument.

1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter3/rpc_pardo.py --deprecated_read \
4    --job_name=rpc-pardo --runner FlinkRunner --flink_master=localhost:8081 \
5	--streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000

On Flink UI, we see the pipeline only has a single task.

On Kafka UI, we can check the output message is a dictionary of a word and its length.