In the previous post, we developed an Apache Beam pipeline where the input data is augmented by a Remote Procedure Call (RPC) service. Each input element performs an RPC call and the output is enriched by the response. This is not an efficient way of accessing an external service provided that the service can accept more than one element. In this post, we discuss how to enhance the pipeline so that a single RPC call is made for a bundle of elements, which can save a significant amount time compared to making a call for each element.
- Part 1 Calculate K Most Frequent Words and Max Word Length
- Part 2 Calculate Average Word Length with/without Fixed Look back
- Part 3 Build Sport Activity Tracker with/without SQL
- Part 4 Call RPC Service for Data Augmentation
- Part 5 Call RPC Service in Batch using Stateless DoFn (this post)
- Part 6 Call RPC Service in Batch with Defined Batch Size using Stateful DoFn
- Part 7 Separate Droppable Data into Side Output
- Part 8 Enhance Sport Activity Tracker with Runner Motivation
- Part 9 Develop Batch File Reader and PiSampler using Splittable DoFn
- Part 10 Develop Streaming File Reader using Splittable DoFn
Development Environment
The development environment has an Apache Flink cluster, Apache Kafka cluster and gRPC server. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.
Manage Environment
The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.
./setup/start-flink-env.sh
./setup/stop-flink-env.sh
Those scripts accept four flags: -f
, -k
and -g
to start/stop individual resources or -a
to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh
.
Below shows how to start resources using the start-up script. We need to launch both the Flink/Kafka clusters and gRPC server if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster and gRPC server only.
1## start a local flink can kafka cluster
2./setup/start-flink-env.sh -f -k -g
3# [+] Running 6/6
4# ⠿ Network app-network Created 0.0s
5# ⠿ Volume "kafka_0_data" Created 0.0s
6# ⠿ Volume "zookeeper_data" Created 0.0s
7# ⠿ Container zookeeper Started 0.5s
8# ⠿ Container kafka-0 Started 0.7s
9# ⠿ Container kafka-ui Started 0.9s
10# [+] Running 2/2
11# ⠿ Network grpc-network Created 0.0s
12# ⠿ Container grpc-server Started 0.4s
13# start flink 1.18.1...
14# Starting cluster.
15# Starting standalonesession daemon on host <hostname>.
16# Starting taskexecutor daemon on host <hostname>.
17
18## start a local kafka cluster only
19./setup/start-flink-env.sh -k -g
20# [+] Running 6/6
21# ⠿ Network app-network Created 0.0s
22# ⠿ Volume "kafka_0_data" Created 0.0s
23# ⠿ Volume "zookeeper_data" Created 0.0s
24# ⠿ Container zookeeper Started 0.5s
25# ⠿ Container kafka-0 Started 0.7s
26# ⠿ Container kafka-ui Started 0.9s
27# [+] Running 2/2
28# ⠿ Network grpc-network Created 0.0s
29# ⠿ Container grpc-server Started 0.4s
Remote Procedure Call (RPC) Service
The RPC service have two methods - resolve
and resolveBatch
. The former accepts a request with a string and returns an integer while the latter accepts a list of string requests and returns a list of integer responses. See Part 4 for details about how the RPC service is developed.
Overall, we have the following files for the gRPC server and client applications, and the server.py
gets started when we execute the start-up script with the -g
flag.
1tree -P "serv*|proto" -I "*pycache*"
2.
3├── proto
4│ └── service.proto
5├── server.py
6├── server_client.py
7├── service_pb2.py
8└── service_pb2_grpc.py
9
101 directory, 5 files
We can check the client and server applications as Python scripts. If we select 1, the next prompt requires to enter a word. Upon entering a word, it returns a tuple of the word and its length as an output. We can make an RPC request with a text if we select 2. Similar to the earlier call, it returns enriched outputs as multiple tuples.
Beam Pipeline
We develop an Apache Beam pipeline that accesses an external RPC service to augment input elements. In this version, it is configured so that a single RPC call is made for a bundle of elements.
Shared Source
We have multiple pipelines that read text messages from an input Kafka topic and write outputs to an output topic. Therefore, the data source and sink transforms are refactored into a utility module as shown below. Note that, the Kafka read and write transforms have an argument called deprecated_read
, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.
1# chapter3/io_utils.py
2import re
3import typing
4
5import apache_beam as beam
6from apache_beam import pvalue
7from apache_beam.io import kafka
8
9
10def decode_message(kafka_kv: tuple):
11 print(kafka_kv)
12 return kafka_kv[1].decode("utf-8")
13
14
15def tokenize(element: str):
16 return re.findall(r"[A-Za-z\']+", element)
17
18
19class ReadWordsFromKafka(beam.PTransform):
20 def __init__(
21 self,
22 bootstrap_servers: str,
23 topics: typing.List[str],
24 group_id: str,
25 deprecated_read: bool,
26 verbose: bool = False,
27 label: str | None = None,
28 ) -> None:
29 super().__init__(label)
30 self.boostrap_servers = bootstrap_servers
31 self.topics = topics
32 self.group_id = group_id
33 self.verbose = verbose
34 self.expansion_service = None
35 if deprecated_read:
36 self.expansion_service = kafka.default_io_expansion_service(
37 ["--experiments=use_deprecated_read"]
38 )
39
40 def expand(self, input: pvalue.PBegin):
41 return (
42 input
43 | "ReadFromKafka"
44 >> kafka.ReadFromKafka(
45 consumer_config={
46 "bootstrap.servers": self.boostrap_servers,
47 "auto.offset.reset": "latest",
48 # "enable.auto.commit": "true",
49 "group.id": self.group_id,
50 },
51 topics=self.topics,
52 timestamp_policy=kafka.ReadFromKafka.create_time_policy,
53 commit_offset_in_finalize=True,
54 expansion_service=self.expansion_service,
55 )
56 | "DecodeMessage" >> beam.Map(decode_message)
57 | "ExtractWords" >> beam.FlatMap(tokenize)
58 )
59
60
61class WriteOutputsToKafka(beam.PTransform):
62 def __init__(
63 self,
64 bootstrap_servers: str,
65 topic: str,
66 deprecated_read: bool, # TO DO: remove as it applies only to ReadFromKafka
67 label: str | None = None,
68 ) -> None:
69 super().__init__(label)
70 self.boostrap_servers = bootstrap_servers
71 self.topic = topic
72 # TO DO: remove as it applies only to ReadFromKafka
73 self.expansion_service = None
74 if deprecated_read:
75 self.expansion_service = kafka.default_io_expansion_service(
76 ["--experiments=use_deprecated_read"]
77 )
78
79 def expand(self, pcoll: pvalue.PCollection):
80 return pcoll | "WriteToKafka" >> kafka.WriteToKafka(
81 producer_config={"bootstrap.servers": self.boostrap_servers},
82 topic=self.topic,
83 expansion_service=self.expansion_service,
84 )
Pipeline Source
According to the life cycle of a DoFn
object, the BatchRpcDoFn
is configured as follows.
setUp
- The RPC service is established because this method is commonly used to initialize transient resources that are needed during the processing.start_bundle
- In aDoFn
object, input elements are split into chunks called bundles, and this method is invoked when a new bundle arrives. We initialise an empty list to keep input elements in a bundle.process
- Each input element in a bundle is appended into the elements list after being converted into aWindowedValue
object.finish_bundle
- A single RPC call is made to theresolveBatch
method after unique input elements are converted into aRequestList
object. It has two advantages. First, as requests with the same input tend to return the same output, making a call with unique elements can reduce request time. Secondly, calling to theresolveBatch
method with multiple elements can save a significant amount of time compared to making a call for each element. Once a response is made, output elements are constructed by augmenting input elements with the response, and the output elements are returned as a list.teardown
- The connection (channel) to the RPC service is closed.
1# chapter3/rpc_pardo_batch.py
2import os
3import argparse
4import json
5import re
6import typing
7import logging
8
9import apache_beam as beam
10from apache_beam.utils.windowed_value import WindowedValue
11from apache_beam.options.pipeline_options import PipelineOptions
12from apache_beam.options.pipeline_options import SetupOptions
13
14from io_utils import ReadWordsFromKafka, WriteOutputsToKafka
15
16
17def create_message(element: typing.Tuple[str, int]):
18 msg = json.dumps({"word": element[0], "length": element[1]})
19 print(msg)
20 return element[0].encode("utf-8"), msg.encode("utf-8")
21
22
23class BatchRpcDoFn(beam.DoFn):
24 channel = None
25 stub = None
26 elements: typing.List[WindowedValue] = None
27 hostname = "localhost"
28 port = "50051"
29
30 def setup(self):
31 import grpc
32 import service_pb2_grpc
33
34 self.channel: grpc.Channel = grpc.insecure_channel(
35 f"{self.hostname}:{self.port}"
36 )
37 self.stub = service_pb2_grpc.RpcServiceStub(self.channel)
38
39 def teardown(self):
40 if self.channel is not None:
41 self.channel.close()
42
43 def start_bundle(self):
44 self.elements = []
45
46 def finish_bundle(self):
47 import service_pb2
48
49 unqiue_values = set([e.value for e in self.elements])
50 request_list = service_pb2.RequestList()
51 request_list.request.extend(
52 [service_pb2.Request(input=e) for e in unqiue_values]
53 )
54 response = self.stub.resolveBatch(request_list)
55 resolved = dict(zip(unqiue_values, [r.output for r in response.response]))
56
57 return [
58 WindowedValue(
59 value=(e.value, resolved[e.value]),
60 timestamp=e.timestamp,
61 windows=e.windows,
62 )
63 for e in self.elements
64 ]
65
66 def process(
67 self,
68 element: str,
69 timestamp=beam.DoFn.TimestampParam,
70 win_param=beam.DoFn.WindowParam,
71 ):
72 self.elements.append(
73 WindowedValue(value=element, timestamp=timestamp, windows=(win_param,))
74 )
75
76
77def run(argv=None, save_main_session=True):
78 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
79 parser.add_argument(
80 "--bootstrap_servers",
81 default="host.docker.internal:29092",
82 help="Kafka bootstrap server addresses",
83 )
84 parser.add_argument("--input_topic", default="input-topic", help="Input topic")
85 parser.add_argument(
86 "--output_topic",
87 default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
88 help="Output topic",
89 )
90 parser.add_argument(
91 "--deprecated_read",
92 action="store_true",
93 default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
94 )
95 parser.set_defaults(deprecated_read=False)
96
97 known_args, pipeline_args = parser.parse_known_args(argv)
98
99 # # We use the save_main_session option because one or more DoFn's in this
100 # # workflow rely on global context (e.g., a module imported at module level).
101 pipeline_options = PipelineOptions(pipeline_args)
102 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
103 print(f"known args - {known_args}")
104 print(f"pipeline options - {pipeline_options.display_data()}")
105
106 with beam.Pipeline(options=pipeline_options) as p:
107 (
108 p
109 | "ReadInputsFromKafka"
110 >> ReadWordsFromKafka(
111 bootstrap_servers=known_args.bootstrap_servers,
112 topics=[known_args.input_topic],
113 group_id=f"{known_args.output_topic}-group",
114 deprecated_read=known_args.deprecated_read,
115 )
116 | "RequestRPC" >> beam.ParDo(BatchRpcDoFn())
117 | "CreateMessags"
118 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
119 | "WriteOutputsToKafka"
120 >> WriteOutputsToKafka(
121 bootstrap_servers=known_args.bootstrap_servers,
122 topic=known_args.output_topic,
123 deprecated_read=known_args.deprecated_read,
124 )
125 )
126
127 logging.getLogger().setLevel(logging.WARN)
128 logging.info("Building pipeline ...")
129
130
131if __name__ == "__main__":
132 run()
Pipeline Test
As described in this documentation, we can test a Beam pipeline as following.
- Create a
TestPipeline
. - Create some static, known test input data.
- Create a
PCollection
of input data using theCreate
transform (if bounded source) or aTestStream
(if unbounded source) - Apply the transform to the input
PCollection
and save the resulting outputPCollection
. - Use
PAssert
and its subclasses (or testing utils in Python) to verify that the outputPCollection
contains the elements that you expect.
We use a text file that keeps a random text (input/lorem.txt
) for testing. Then, we add the lines into a test stream and apply the main transform. Finally, we compare the actual output with an expected output. The expected output is a list of tuples where each element is a word and its length.
1# chapter3/rpc_pardo_batch_test.py
2import os
3import unittest
4from concurrent import futures
5
6import apache_beam as beam
7from apache_beam.coders import coders
8from apache_beam.testing.test_pipeline import TestPipeline
9from apache_beam.testing.util import assert_that, equal_to
10from apache_beam.testing.test_stream import TestStream
11from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
12
13import grpc
14import service_pb2_grpc
15import server
16
17from rpc_pardo_batch import BatchRpcDoFn
18from io_utils import tokenize
19
20
21def read_file(filename: str, inputpath: str):
22 with open(os.path.join(inputpath, filename), "r") as f:
23 return f.readlines()
24
25
26def compute_expected_output(lines: list):
27 output = []
28 for line in lines:
29 words = [(w, len(w)) for w in tokenize(line)]
30 output = output + words
31 return output
32
33
34class RpcParDooBatchTest(unittest.TestCase):
35 server_class = server.RpcServiceServicer
36 port = 50051
37
38 def setUp(self):
39 self.server = grpc.server(futures.ThreadPoolExecutor())
40 service_pb2_grpc.add_RpcServiceServicer_to_server(
41 self.server_class(), self.server
42 )
43 self.server.add_insecure_port(f"[::]:{self.port}")
44 self.server.start()
45
46 def tearDown(self):
47 self.server.stop(None)
48
49 def test_pipeline(self):
50 options = PipelineOptions()
51 options.view_as(StandardOptions).streaming = True
52 with TestPipeline(options=options) as p:
53 PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
54 lines = read_file("lorem.txt", os.path.join(PARENT_DIR, "inputs"))
55 test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
56 for line in lines:
57 test_stream.add_elements([line])
58 test_stream.advance_watermark_to_infinity()
59
60 output = (
61 p
62 | test_stream
63 | "ExtractWords" >> beam.FlatMap(tokenize)
64 | "RequestRPC" >> beam.ParDo(BatchRpcDoFn())
65 )
66
67 EXPECTED_OUTPUT = compute_expected_output(lines)
68
69 assert_that(output, equal_to(EXPECTED_OUTPUT))
70
71
72if __name__ == "__main__":
73 unittest.main()
We can execute the pipeline test as shown below.
1python chapter3/rpc_pardo_batch_test.py
2.
3----------------------------------------------------------------------
4Ran 1 test in 0.285s
5
6OK
Pipeline Execution
Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by the Kafka producer app and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.
1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1 host.docker.internal
We need to send messages into the input Kafka topic before executing the pipeline. Input messages can be sent by executing the Kafka text producer - python utils/faker_gen.py
. See Part 1 for details about the Kafka producer.
When executing the pipeline, we specify only a single known argument that enables to use the legacy read (--deprecated_read
) while accepting default values of the other known arguments (bootstrap_servers
, input_topic
…). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081
). Alternatively, we can use an embedded Flink cluster if we exclude that argument.
1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter3/rpc_pardo_batch.py --deprecated_read \
4 --job_name=rpc-pardo-batch --runner FlinkRunner --flink_master=localhost:8081 \
5 --streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000
On Flink UI, we see the pipeline only has a single task.
On Kafka UI, we can check the output message is a dictionary of a word and its length.
Comments