In this post, we develop an Apache Beam pipeline where the input data is augmented by a Remote Procedure Call (RPC) service. Each input element performs an RPC call and the output is enriched by the response. This is not an efficient way of accessing an external service provided that the service can accept more than one element. In the subsequent two posts, we will discuss updated pipelines that make RPC calls more efficiently. We begin with illustrating how to manage development resources followed by demonstrating the RPC service that we use in this series. Finally, we develop a Beam pipeline that accesses the external service to augment the input elements.
- Part 1 Calculate K Most Frequent Words and Max Word Length
- Part 2 Calculate Average Word Length with/without Fixed Look back
- Part 3 Build Sport Activity Tracker with/without SQL
- Part 4 Call RPC Service for Data Augmentation (this post)
- Part 5 Call RPC Service in Batch using Stateless DoFn
- Part 6 Call RPC Service in Batch with Defined Batch Size using Stateful DoFn
- Part 7 Separate Droppable Data into Side Output
- Part 8 Enhance Sport Activity Tracker with Runner Motivation
- Part 9 Develop Batch File Reader and PiSampler using Splittable DoFn
- Part 10 Develop Streaming File Reader using Splittable DoFn
Development Environment
The development environment has an Apache Flink cluster, Apache Kafka cluster and gRPC server. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.
Manage Environment
The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.
./setup/start-flink-env.sh
./setup/stop-flink-env.sh
Those scripts accept four flags: -f
, -k
and -g
to start/stop individual resources or -a
to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh
.
Below shows how to start resources using the start-up script. We need to launch both the Flink/Kafka clusters and gRPC server if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster and gRPC server only.
1## start a local flink can kafka cluster
2./setup/start-flink-env.sh -f -k -g
3# [+] Running 6/6
4# ⠿ Network app-network Created 0.0s
5# ⠿ Volume "kafka_0_data" Created 0.0s
6# ⠿ Volume "zookeeper_data" Created 0.0s
7# ⠿ Container zookeeper Started 0.5s
8# ⠿ Container kafka-0 Started 0.7s
9# ⠿ Container kafka-ui Started 0.9s
10# [+] Running 2/2
11# ⠿ Network grpc-network Created 0.0s
12# ⠿ Container grpc-server Started 0.4s
13# start flink 1.18.1...
14# Starting cluster.
15# Starting standalonesession daemon on host <hostname>.
16# Starting taskexecutor daemon on host <hostname>.
17
18## start a local kafka cluster only
19./setup/start-flink-env.sh -k -g
20# [+] Running 6/6
21# ⠿ Network app-network Created 0.0s
22# ⠿ Volume "kafka_0_data" Created 0.0s
23# ⠿ Volume "zookeeper_data" Created 0.0s
24# ⠿ Container zookeeper Started 0.5s
25# ⠿ Container kafka-0 Started 0.7s
26# ⠿ Container kafka-ui Started 0.9s
27# [+] Running 2/2
28# ⠿ Network grpc-network Created 0.0s
29# ⠿ Container grpc-server Started 0.4s
Introduction to Remote Procedure Call (RPC) Service
Create client and server interfaces
A service is defined in the .proto
file, and it supports two methods - resolve
and resolveBatch
. The former accepts a request with a string and returns an integer while the latter accepts a list of string requests and returns a list of integer responses.
1// chapter3/proto/service.proto
2syntax = "proto3";
3
4package chapter3;
5
6message Request {
7 string input = 1;
8}
9
10message Response {
11 int32 output = 1;
12}
13
14message RequestList {
15 repeated Request request = 1;
16}
17
18message ResponseList {
19 repeated Response response = 1;
20}
21
22service RpcService {
23 rpc resolve(Request) returns (Response);
24 rpc resolveBatch(RequestList) returns (ResponseList);
25}
We can generate the gRPC client and server interfaces from the .proto
service definition using the grpcio-tools package as shown below.
1cd chapter3
2mkdir proto && touch proto/service.proto
3
4## << copy the proto service definition >>
5
6## generate grpc client and server interfaces
7python -m grpc_tools.protoc -I proto --python_out=. --grpc_python_out=. proto/service.proto
Running the above command generates service_pb2.py
and service_pb2_grpc.py
, and they contain:
- classes for the messages defined in
service.proto
- classes for the service defined in
service.proto
RpcServiceStub
, which can be used by clients to invoke RpcService RPCsRpcServiceServicer
, which defines the interface for implementations of the RpcService service
- a function for the service defined in service.proto
add_RpcServiceServicer_to_server
, which adds a RpcServiceServicer to agrpc.Server
Create client and server
Using the gRPC interfaces generated earlier, we can create server and client applications. The server implements the two RPC methods (resolve
and resolveBatch
) where the response output is the length of the request input string. This server application is accessed by the Beam pipline, and it gets started when we start the development resources while including the -g
flag.
1# chapter3/server.py
2import os
3import argparse
4from concurrent import futures
5
6import grpc
7import service_pb2
8import service_pb2_grpc
9
10
11class RpcServiceServicer(service_pb2_grpc.RpcServiceServicer):
12 def resolve(self, request, context):
13 if os.getenv("VERBOSE", "False") == "True":
14 print(f"resolve Request Made: input - {request.input}")
15 response = service_pb2.Response(output=len(request.input))
16 return response
17
18 def resolveBatch(self, request, context):
19 if os.getenv("VERBOSE", "False") == "True":
20 print("resolveBatch Request Made:")
21 print(f"\tInputs - {', '.join([r.input for r in request.request])}")
22 response = service_pb2.ResponseList()
23 response.response.extend(
24 [service_pb2.Response(output=len(r.input)) for r in request.request]
25 )
26 return response
27
28
29def serve():
30 server = grpc.server(futures.ThreadPoolExecutor())
31 service_pb2_grpc.add_RpcServiceServicer_to_server(RpcServiceServicer(), server)
32 server.add_insecure_port(os.getenv("INSECURE_PORT", "0.0.0.0:50051"))
33 server.start()
34 server.wait_for_termination()
35
36
37if __name__ == "__main__":
38 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
39 parser.add_argument(
40 "--verbose",
41 action="store_true",
42 default="Whether to print messages for debugging.",
43 )
44 parser.set_defaults(verbose=False)
45 opts = parser.parse_args()
46 os.environ["VERBOSE"] = str(opts.verbose)
47 serve()
The client application is created for demonstration, and we use the same logic to access the server application within a Beam pipeline. It requires a user input (1 or 2) to determine which method to call, and a user is expected to write an element (word or text) so that the client can make a request. See below for details about how the client and server applications work.
1# chapter3/server_client.py
2import time
3
4import grpc
5import service_pb2
6import service_pb2_grpc
7
8
9def get_client_stream_requests():
10 while True:
11 name = input("Please enter a name (or nothing to stop chatting):")
12 if name == "":
13 break
14 hello_request = service_pb2.HelloRequest(greeting="Hello", name=name)
15 yield hello_request
16 time.sleep(1)
17
18
19def run():
20 with grpc.insecure_channel("localhost:50051") as channel:
21 stub = service_pb2_grpc.RpcServiceStub(channel)
22 print("1. Resolve - Unary")
23 print("2. ResolveBatch - Unary")
24 rpc_call = input("Which rpc would you like to make: ")
25 if rpc_call == "1":
26 element = input("Please enter a word: ")
27 if not element:
28 element = "Hello"
29 request = service_pb2.Request(input=element)
30 resolved = stub.resolve(request)
31 print("Resolve response received: ")
32 print(f"({element}, {resolved.output})")
33 if rpc_call == "2":
34 element = input("Please enter a text: ")
35 if not element:
36 element = "Beautiful is better than ugly"
37 words = element.split(" ")
38 request_list = service_pb2.RequestList()
39 request_list.request.extend([service_pb2.Request(input=e) for e in words])
40 response = stub.resolveBatch(request_list)
41 resolved = [r.output for r in response.response]
42 print("ResolveBatch response received: ")
43 print(", ".join([f"({t[0]}, {t[1]})" for t in zip(words, resolved)]))
44
45
46if __name__ == "__main__":
47 run()
Overall, we have the following files for the gRPC server and client applications, and the server.py
gets started when we execute the start-up script with the -g
flag.
1tree -P "serv*|proto" -I "*pycache*"
2.
3├── proto
4│ └── service.proto
5├── server.py
6├── server_client.py
7├── service_pb2.py
8└── service_pb2_grpc.py
9
101 directory, 5 files
We can check the client and server applications as Python scripts. If we select 1, the next prompt requires to enter a word. Upon entering a word, it returns a tuple of the word and its length as an output. We can make an RPC request with a text if we select 2. Similar to the earlier call, it returns enriched outputs as multiple tuples.
Beam Pipeline
We develop an Apache Beam pipeline that accesses an external RPC service to augment input elements. In this version, it is configured so that each element calls the RPC service.
Shared Source
We have multiple pipelines that read text messages from an input Kafka topic and write outputs to an output topic. Therefore, the data source and sink transforms are refactored into a utility module as shown below. Note that, the Kafka read and write methods has an argument called deprecated_read
, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.
1# chapter3/io_utils.py
2import re
3import typing
4
5import apache_beam as beam
6from apache_beam import pvalue
7from apache_beam.io import kafka
8
9
10def decode_message(kafka_kv: tuple):
11 print(kafka_kv)
12 return kafka_kv[1].decode("utf-8")
13
14
15def tokenize(element: str):
16 return re.findall(r"[A-Za-z\']+", element)
17
18
19class ReadWordsFromKafka(beam.PTransform):
20 def __init__(
21 self,
22 bootstrap_servers: str,
23 topics: typing.List[str],
24 group_id: str,
25 deprecated_read: bool,
26 verbose: bool = False,
27 label: str | None = None,
28 ) -> None:
29 super().__init__(label)
30 self.boostrap_servers = bootstrap_servers
31 self.topics = topics
32 self.group_id = group_id
33 self.verbose = verbose
34 self.expansion_service = None
35 if deprecated_read:
36 self.expansion_service = kafka.default_io_expansion_service(
37 ["--experiments=use_deprecated_read"]
38 )
39
40 def expand(self, input: pvalue.PBegin):
41 return (
42 input
43 | "ReadFromKafka"
44 >> kafka.ReadFromKafka(
45 consumer_config={
46 "bootstrap.servers": self.boostrap_servers,
47 "auto.offset.reset": "latest",
48 # "enable.auto.commit": "true",
49 "group.id": self.group_id,
50 },
51 topics=self.topics,
52 timestamp_policy=kafka.ReadFromKafka.create_time_policy,
53 commit_offset_in_finalize=True,
54 expansion_service=self.expansion_service,
55 )
56 | "DecodeMessage" >> beam.Map(decode_message)
57 | "ExtractWords" >> beam.FlatMap(tokenize)
58 )
59
60
61class WriteOutputsToKafka(beam.PTransform):
62 def __init__(
63 self,
64 bootstrap_servers: str,
65 topic: str,
66 deprecated_read: bool,
67 label: str | None = None,
68 ) -> None:
69 super().__init__(label)
70 self.boostrap_servers = bootstrap_servers
71 self.topic = topic
72 self.expansion_service = None
73 if deprecated_read:
74 self.expansion_service = kafka.default_io_expansion_service(
75 ["--experiments=use_deprecated_read"]
76 )
77
78 def expand(self, pcoll: pvalue.PCollection):
79 return pcoll | "WriteToKafka" >> kafka.WriteToKafka(
80 producer_config={"bootstrap.servers": self.boostrap_servers},
81 topic=self.topic,
82 expansion_service=self.expansion_service,
83 )
Pipeline Source
In RpcDoFn
, connection to the RPC service is established in the setUp
method, and the input element is augmented by a response from the service in the process
method. It returns a tuple of the element and response output, which is the length of the element. Finally, the connection (channel) is closed in the teardown
method.
1# chapter3/rpc_pardo.py
2import os
3import argparse
4import json
5import re
6import typing
7import logging
8
9import apache_beam as beam
10from apache_beam.options.pipeline_options import PipelineOptions
11from apache_beam.options.pipeline_options import SetupOptions
12
13from io_utils import ReadWordsFromKafka, WriteOutputsToKafka
14
15
16def create_message(element: typing.Tuple[str, int]):
17 msg = json.dumps({"word": element[0], "length": element[1]})
18 print(msg)
19 return element[0].encode("utf-8"), msg.encode("utf-8")
20
21
22class RpcDoFn(beam.DoFn):
23 channel = None
24 stub = None
25 hostname = "localhost"
26 port = "50051"
27
28 def setup(self):
29 import grpc
30 import service_pb2_grpc
31
32 self.channel: grpc.Channel = grpc.insecure_channel(
33 f"{self.hostname}:{self.port}"
34 )
35 self.stub = service_pb2_grpc.RpcServiceStub(self.channel)
36
37 def teardown(self):
38 if self.channel is not None:
39 self.channel.close()
40
41 def process(self, element: str) -> typing.Iterator[typing.Tuple[str, int]]:
42 import service_pb2
43
44 request = service_pb2.Request(input=element)
45 response = self.stub.resolve(request)
46 yield element, response.output
47
48
49def run(argv=None, save_main_session=True):
50 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
51 parser.add_argument(
52 "--bootstrap_servers",
53 default="host.docker.internal:29092",
54 help="Kafka bootstrap server addresses",
55 )
56 parser.add_argument("--input_topic", default="input-topic", help="Input topic")
57 parser.add_argument(
58 "--output_topic",
59 default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
60 help="Output topic",
61 )
62 parser.add_argument(
63 "--deprecated_read",
64 action="store_true",
65 default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
66 )
67 parser.set_defaults(deprecated_read=False)
68
69 known_args, pipeline_args = parser.parse_known_args(argv)
70
71 # # We use the save_main_session option because one or more DoFn's in this
72 # # workflow rely on global context (e.g., a module imported at module level).
73 pipeline_options = PipelineOptions(pipeline_args)
74 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
75 print(f"known args - {known_args}")
76 print(f"pipeline options - {pipeline_options.display_data()}")
77
78 with beam.Pipeline(options=pipeline_options) as p:
79 (
80 p
81 | "ReadInputsFromKafka"
82 >> ReadWordsFromKafka(
83 bootstrap_servers=known_args.bootstrap_servers,
84 topics=[known_args.input_topic],
85 group_id=f"{known_args.output_topic}-group",
86 deprecated_read=known_args.deprecated_read,
87 )
88 | "RequestRPC" >> beam.ParDo(RpcDoFn())
89 | "CreateMessags"
90 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
91 | "WriteOutputsToKafka"
92 >> WriteOutputsToKafka(
93 bootstrap_servers=known_args.bootstrap_servers,
94 topic=known_args.output_topic,
95 deprecated_read=known_args.deprecated_read,
96 )
97 )
98
99 logging.getLogger().setLevel(logging.WARN)
100 logging.info("Building pipeline ...")
101
102
103if __name__ == "__main__":
104 run()
Pipeline Test
As described in this documentation, we can test a Beam pipeline as following.
- Create a
TestPipeline
. - Create some static, known test input data.
- Create a
PCollection
of input data using theCreate
transform (if bounded source) or aTestStream
(if unbounded source) - Apply the transform to the input
PCollection
and save the resulting outputPCollection
. - Use
PAssert
and its subclasses (or testing utils in Python) to verify that the outputPCollection
contains the elements that you expect.
We use a text file that keeps a random text (input/lorem.txt
) for testing. Then, we add the lines into a test stream and apply the main transform. Finally, we compare the actual output with an expected output. The expected output is a list of tuples where each element is a word and its length.
1# chapter3/rpc_pardo_test.py
2import os
3import unittest
4from concurrent import futures
5
6import apache_beam as beam
7from apache_beam.coders import coders
8from apache_beam.testing.test_pipeline import TestPipeline
9from apache_beam.testing.util import assert_that, equal_to
10from apache_beam.testing.test_stream import TestStream
11from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
12
13import grpc
14import service_pb2_grpc
15import server
16
17from rpc_pardo import RpcDoFn
18from io_utils import tokenize
19
20
21def read_file(filename: str, inputpath: str):
22 with open(os.path.join(inputpath, filename), "r") as f:
23 return f.readlines()
24
25
26def compute_expected_output(lines: list):
27 output = []
28 for line in lines:
29 words = [(w, len(w)) for w in tokenize(line)]
30 output = output + words
31 return output
32
33
34class RpcParDooTest(unittest.TestCase):
35 server_class = server.RpcServiceServicer
36 port = 50051
37
38 def setUp(self):
39 self.server = grpc.server(futures.ThreadPoolExecutor())
40 service_pb2_grpc.add_RpcServiceServicer_to_server(
41 self.server_class(), self.server
42 )
43 self.server.add_insecure_port(f"[::]:{self.port}")
44 self.server.start()
45
46 def tearDown(self):
47 self.server.stop(None)
48
49 def test_pipeline(self):
50 options = PipelineOptions()
51 options.view_as(StandardOptions).streaming = True
52 with TestPipeline(options=options) as p:
53 PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
54 lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
55 test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
56 for line in lines:
57 test_stream.add_elements([line])
58 test_stream.advance_watermark_to_infinity()
59
60 output = (
61 p
62 | test_stream
63 | "ExtractWords" >> beam.FlatMap(tokenize)
64 | "RequestRPC" >> beam.ParDo(RpcDoFn())
65 )
66
67 EXPECTED_OUTPUT = compute_expected_output(lines)
68
69 assert_that(output, equal_to(EXPECTED_OUTPUT))
70
71
72if __name__ == "__main__":
73 unittest.main()
We can execute the pipeline test as shown below.
1python chapter3/rpc_pardo_test.py
2.
3----------------------------------------------------------------------
4Ran 1 test in 0.373s
5
6OK
Pipeline Execution
Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by a Kafka producer app that is discussed later and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.
1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1 host.docker.internal
We need to send messages into the input Kafka topic before executing the pipeline. Input text message can be sent by executing a Kafka text producer - python utils/faker_gen.py
.
When executing the pipeline, we specify only a single known argument that enables to use the legacy read (--deprecated_read
) while accepting default values of the other known arguments (bootstrap_servers
, input_topic
…). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081
). Alternatively, we can use an embedded Flink cluster if we exclude that argument.
1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter3/rpc_pardo.py --deprecated_read \
4 --job_name=rpc-pardo --runner FlinkRunner --flink_master=localhost:8081 \
5 --streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000
On Flink UI, we see the pipeline only has a single task.
On Kafka UI, we can check the output message is a dictionary of a word and its length.
Comments