In this post, we develop two Apache Beam pipelines that track sport activities of users and output their speed periodically. The first pipeline uses native transforms and Beam SQL is used for the latter. While Beam SQL can be useful in some situations, its features in the Python SDK are not complete compared to the Java SDK. Therefore, we are not able to build the required tracking pipeline using it. We end up discussing potential improvements of Beam SQL so that it can be used for building competitive applications with the Python SDK.
- Part 1 Calculate K Most Frequent Words and Max Word Length
- Part 2 Calculate Average Word Length with/without Fixed Look back
- Part 3 Build Sport Activity Tracker with/without SQL (this post)
- Part 4 Call RPC Service for Data Augmentation
- Part 5 Call RPC Service in Batch using Stateless DoFn
- Part 6 Call RPC Service in Batch with Defined Batch Size using Stateful DoFn
- Part 7 Separate Droppable Data into Side Output
- Part 8 Enhance Sport Activity Tracker with Runner Motivation
- Part 9 Develop Batch File Reader and PiSampler using Splittable DoFn
- Part 10 Develop Streaming File Reader using Splittable DoFn
Development Environment
The development environment has an Apache Flink cluster and Apache Kafka cluster and gRPC server - gRPC server will be used in later posts. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.
Manage Environment
The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.
./setup/start-flink-env.sh
./setup/stop-flink-env.sh
Those scripts accept four flags: -f
, -k
and -g
to start/stop individual resources or -a
to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh
.
Below shows how to start resources using the start-up script. We need to launch both the Flink and Kafka clusters if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster only.
1## start a local flink can kafka cluster
2./setup/start-flink-env.sh -f -k
3# start kafka...
4# [+] Running 6/6
5# ⠿ Network app-network Created 0.0s
6# ⠿ Volume "zookeeper_data" Created 0.0s
7# ⠿ Volume "kafka_0_data" Created 0.0s
8# ⠿ Container zookeeper Started 0.3s
9# ⠿ Container kafka-0 Started 0.5s
10# ⠿ Container kafka-ui Started 0.8s
11# start flink 1.18.1...
12# Starting cluster.
13# Starting standalonesession daemon on host <hostname>.
14# Starting taskexecutor daemon on host <hostname>.
15
16## start a local kafka cluster only
17./setup/start-flink-env.sh -k
18# start kafka...
19# [+] Running 6/6
20# ⠿ Network app-network Created 0.0s
21# ⠿ Volume "zookeeper_data" Created 0.0s
22# ⠿ Volume "kafka_0_data" Created 0.0s
23# ⠿ Container zookeeper Started 0.3s
24# ⠿ Container kafka-0 Started 0.5s
25# ⠿ Container kafka-ui Started 0.8s
Kafka Sport Activity Producer
A Kafka producer application is created to generate sport tracking activities of users. Those activities are tracked by user positions that have the following variables.
- spot - An integer value that indicates where a user locates. Although we may represent a user position using a geographic coordinate, we keep it as an integer for the sake of simplicity in this post.
- timestamp - A float value that shows the time in seconds since the Epoch when a user locates in the corresponding spot.
A configurable number of user tracks (--num_tracks
, default 5) can be generated every two seconds by default (--delay_seconds
). The producer sends the activity tracking records as a text by concatenating the user ID and position values with tab characters (\t
).
1# utils/sport_tracker_gen.py
2import time
3import argparse
4import random
5import typing
6
7from producer import TextProducer
8
9
10class Position(typing.NamedTuple):
11 spot: int
12 timestamp: float
13
14 @classmethod
15 def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
16 return cls(spot=spot, timestamp=timestamp)
17
18
19class TrackGenerator:
20 def __init__(self, num_tracks: int, delay_seconds: int) -> None:
21 self.num_tracks = num_tracks
22 self.delay_seconds = delay_seconds
23 self.positions = [
24 Position.create(spot=random.randint(0, 110)) for _ in range(self.num_tracks)
25 ]
26
27 def update_positions(self):
28 for ind, position in enumerate(self.positions):
29 self.positions[ind] = self.move(
30 start=position,
31 delta=random.randint(-10, 10),
32 duration=time.time() - position.timestamp,
33 )
34
35 def move(self, start: Position, delta: int, duration: float):
36 spot, timestamp = tuple(start)
37 return Position(spot=spot + delta, timestamp=timestamp + duration)
38
39 def create_tracks(self):
40 tracks = []
41 for ind, position in enumerate(self.positions):
42 track = f"user{ind}\t{position.spot}\t{position.timestamp}"
43 print(track)
44 tracks.append(track)
45 return tracks
46
47
48if __name__ == "__main__":
49 parser = argparse.ArgumentParser(__file__, description="Sport Data Generator")
50 parser.add_argument(
51 "--bootstrap_servers",
52 "-b",
53 type=str,
54 default="localhost:29092",
55 help="Comma separated string of Kafka bootstrap addresses",
56 )
57 parser.add_argument(
58 "--topic_name",
59 "-t",
60 type=str,
61 default="input-topic",
62 help="Kafka topic name",
63 )
64 parser.add_argument(
65 "--num_tracks",
66 "-n",
67 type=int,
68 default=5,
69 help="Number of tracks",
70 )
71 parser.add_argument(
72 "--delay_seconds",
73 "-d",
74 type=float,
75 default=2,
76 help="The amount of time that a record should be delayed.",
77 )
78 args = parser.parse_args()
79
80 producer = TextProducer(args.bootstrap_servers, args.topic_name)
81 track_gen = TrackGenerator(args.num_tracks, args.delay_seconds)
82
83 while True:
84 tracks = track_gen.create_tracks()
85 for track in tracks:
86 producer.send_to_kafka(text=track)
87 track_gen.update_positions()
88 time.sleep(random.randint(0, args.delay_seconds))
The producer app sends the input messages using the following Kafka producer class.
1# utils/producer.py
2from kafka import KafkaProducer
3
4
5class TextProducer:
6 def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
7 self.bootstrap_servers = bootstrap_servers
8 self.topic_name = topic_name
9 self.kafka_producer = self.create_producer()
10
11 def create_producer(self):
12 """
13 Returns a KafkaProducer instance
14 """
15 return KafkaProducer(
16 bootstrap_servers=self.bootstrap_servers,
17 value_serializer=lambda v: v.encode("utf-8"),
18 )
19
20 def send_to_kafka(self, text: str, timestamp_ms: int = None):
21 """
22 Sends text to a Kafka topic.
23 """
24 try:
25 args = {"topic": self.topic_name, "value": text}
26 if timestamp_ms is not None:
27 args = {**args, **{"timestamp_ms": timestamp_ms}}
28 self.kafka_producer.send(**args)
29 self.kafka_producer.flush()
30 except Exception as e:
31 raise RuntimeError("fails to send a message") from e
We can execute the producer app after starting the Kafka cluster. Once executed, the activity tracking records are printed in the terminal.
1python utils/sport_tracker_gen.py
2user0 97 1722127107.0654943
3user1 56 1722127107.0654943
4user2 55 1722127107.0654943
5user3 55 1722127107.0654943
6user4 95 1722127107.0654943
7===========================
8user0 88 1722127107.1854753
9user1 49 1722127107.1854813
10user2 55 1722127107.1854827
11user3 61 1722127107.185484
12user4 88 1722127107.1854854
13===========================
14...
Also, we can check the input messages using Kafka UI on localhost:8080.
Beam Pipelines
We develop two Beam pipelines that track sport activities of users. The first pipeline uses native transforms and Beam SQL is used for the latter.
Shared Source
Both the pipelines share the same sources, and they are refactored in a separate module.
Position
/PositionCoder
- The input text messages are converted into a custom type (
Position
). Therefore, we need to create its type definition and register the instruction about how to encode/decode its value using a coder (PositionCoder
). Note that, without registering the coder, the custom type cannot be processed by a portable runner.
- The input text messages are converted into a custom type (
PreProcessInput
- This is a composite transform that converts an input text message into a tuple of user ID and position as well as assigns a timestamp value into an individual element.
ReadPositionsFromKafka
- It reads messages from a Kafka topic, and returns tuple elements of user ID and position. We need to specify the output type hint for a portable runner to recognise the output type correctly. Note that, the Kafka read and write methods has an argument called
deprecated_read
, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.
- It reads messages from a Kafka topic, and returns tuple elements of user ID and position. We need to specify the output type hint for a portable runner to recognise the output type correctly. Note that, the Kafka read and write methods has an argument called
WriteMetricsToKafka
- It sends output messages to a Kafka topic. Each message is a tuple of user ID and speed. Note that the input type hint is necessary when the inputs are piped from a transform by Beam SQL.
1# chapter2/sport_tracker_utils.py
2import re
3import json
4import random
5import time
6import typing
7
8import apache_beam as beam
9from apache_beam.io import kafka
10from apache_beam import pvalue
11from apache_beam.transforms.window import TimestampedValue
12from apache_beam.utils.timestamp import Timestamp
13
14
15class Position(typing.NamedTuple):
16 spot: int
17 timestamp: float
18
19 def to_bytes(self):
20 return json.dumps(self._asdict()).encode("utf-8")
21
22 @classmethod
23 def from_bytes(cls, encoded: bytes):
24 d = json.loads(encoded.decode("utf-8"))
25 return cls(**d)
26
27 @classmethod
28 def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
29 return cls(spot=spot, timestamp=timestamp)
30
31
32class PositionCoder(beam.coders.Coder):
33 def encode(self, value: Position):
34 return value.to_bytes()
35
36 def decode(self, encoded: bytes):
37 return Position.from_bytes(encoded)
38
39 def is_deterministic(self) -> bool:
40 return True
41
42
43beam.coders.registry.register_coder(Position, PositionCoder)
44
45
46def add_timestamp(element: typing.Tuple[str, Position]):
47 return TimestampedValue(element, Timestamp.of(element[1].timestamp))
48
49
50def to_positions(input: str):
51 workout, spot, timestamp = tuple(re.sub("\n", "", input).split("\t"))
52 return workout, Position(spot=int(spot), timestamp=float(timestamp))
53
54
55class PreProcessInput(beam.PTransform):
56 def expand(self, pcoll: pvalue.PCollection):
57 return (
58 pcoll
59 | "ToPositions" >> beam.Map(to_positions)
60 | "AddTS" >> beam.Map(add_timestamp)
61 )
62
63
64def decode_message(kafka_kv: tuple):
65 print(kafka_kv)
66 return kafka_kv[1].decode("utf-8")
67
68
69@beam.typehints.with_output_types(typing.Tuple[str, Position])
70class ReadPositionsFromKafka(beam.PTransform):
71 def __init__(
72 self,
73 bootstrap_servers: str,
74 topics: typing.List[str],
75 group_id: str,
76 deprecated_read: bool,
77 verbose: bool = False,
78 label: str | None = None,
79 ):
80 super().__init__(label)
81 self.boostrap_servers = bootstrap_servers
82 self.topics = topics
83 self.group_id = group_id
84 self.verbose = verbose
85 self.expansion_service = None
86 if deprecated_read:
87 self.expansion_service = kafka.default_io_expansion_service(
88 ["--experiments=use_deprecated_read"]
89 )
90
91 def expand(self, input: pvalue.PBegin):
92 return (
93 input
94 | "ReadFromKafka"
95 >> kafka.ReadFromKafka(
96 consumer_config={
97 "bootstrap.servers": self.boostrap_servers,
98 "auto.offset.reset": "earliest",
99 # "enable.auto.commit": "true",
100 "group.id": self.group_id,
101 },
102 topics=self.topics,
103 timestamp_policy=kafka.ReadFromKafka.create_time_policy,
104 expansion_service=self.expansion_service,
105 )
106 | "DecodeMessage" >> beam.Map(decode_message)
107 | "PreProcessInput" >> PreProcessInput()
108 )
109
110
111@beam.typehints.with_input_types(typing.Tuple[str, float])
112class WriteMetricsToKafka(beam.PTransform):
113 def __init__(
114 self,
115 bootstrap_servers: str,
116 topic: str,
117 deprecated_read: bool,
118 label: str | None = None,
119 ):
120 super().__init__(label)
121 self.boostrap_servers = bootstrap_servers
122 self.topic = topic
123 self.expansion_service = None
124 if deprecated_read:
125 self.expansion_service = kafka.default_io_expansion_service(
126 ["--experiments=use_deprecated_read"]
127 )
128
129 def expand(self, pcoll: pvalue.PCollection):
130 def create_message(element: typing.Tuple[str, float]):
131 msg = json.dumps(dict(zip(["user", "speed"], element)))
132 print(msg)
133 return "".encode("utf-8"), msg.encode("utf-8")
134
135 return (
136 pcoll
137 | "CreateMessage"
138 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
139 | "WriteToKafka"
140 >> kafka.WriteToKafka(
141 producer_config={"bootstrap.servers": self.boostrap_servers},
142 topic=self.topic,
143 expansion_service=self.expansion_service,
144 )
145 )
Sport Tracker
The main transforms of this pipeline perform
Windowing
: assigns input elements into a global window with the following configuration.- Emits (or triggers) a window after a certain amount of processing time has passed since data was received with a delay of 3 seconds. (
trigger=AfterWatermark(early=AfterProcessingTime(3))
) - Disallows Late data (
allowed_lateness=0
) - Assigns the output timestamp from the latest input timestamp (
timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST
)- By default, the end of window timestamp is taken, but it is so distance future for the global window. Therefore, we take the latest timestamp of the input elements instead.
- Configures
ACCUMULATING
as the window’s accumulation mode (accumulation_mode=AccumulationMode.ACCUMULATING
).- The trigger emits the results early but not multiple times. Therefore, which accumulation mode to choose doesn’t give a different result in this transform. We can ignore this configuration.
- Emits (or triggers) a window after a certain amount of processing time has passed since data was received with a delay of 3 seconds. (
ComputeMetrics
: (1) groups by the input elements by key (user ID), (2) obtains the total distance and duration by looping through individual elements, (3) calculates the speed of a user, which is the distance divided by the duration, and, finally, (4) returns a metrics record, which is a tuple of user ID and speed.
1# chapter2/sport_tracker.py
2import os
3import argparse
4import re
5import logging
6import typing
7
8import apache_beam as beam
9from apache_beam import pvalue
10from apache_beam.transforms.window import GlobalWindows, TimestampCombiner
11from apache_beam.transforms.trigger import (
12 AfterWatermark,
13 AccumulationMode,
14 AfterProcessingTime,
15)
16from apache_beam.options.pipeline_options import PipelineOptions
17from apache_beam.options.pipeline_options import SetupOptions
18
19from sport_tracker_utils import (
20 Position,
21 ReadPositionsFromKafka,
22 WriteMetricsToKafka,
23)
24
25
26def compute(element: typing.Tuple[str, typing.Iterable[Position]]):
27 last: Position = None
28 distance = 0
29 duration = 0
30 for p in sorted(element[1], key=lambda p: p.timestamp):
31 if last is not None:
32 distance += abs(p.spot - last.spot)
33 duration += p.timestamp - last.timestamp
34 last = p
35 return element[0], distance / duration if duration > 0 else 0
36
37
38class ComputeMetrics(beam.PTransform):
39 def expand(self, pcoll: pvalue.PCollection):
40 return (
41 pcoll | "GroupByKey" >> beam.GroupByKey() | "Compute" >> beam.Map(compute)
42 )
43
44
45def run(argv=None, save_main_session=True):
46 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
47 parser.add_argument(
48 "--bootstrap_servers",
49 default="host.docker.internal:29092",
50 help="Kafka bootstrap server addresses",
51 )
52 parser.add_argument("--input_topic", default="input-topic", help="Input topic")
53 parser.add_argument(
54 "--output_topic",
55 default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
56 help="Output topic",
57 )
58 parser.add_argument(
59 "--deprecated_read",
60 action="store_true",
61 default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
62 )
63 parser.set_defaults(deprecated_read=False)
64
65 known_args, pipeline_args = parser.parse_known_args(argv)
66
67 # # We use the save_main_session option because one or more DoFn's in this
68 # # workflow rely on global context (e.g., a module imported at module level).
69 pipeline_options = PipelineOptions(pipeline_args)
70 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
71 print(f"known args - {known_args}")
72 print(f"pipeline options - {pipeline_options.display_data()}")
73
74 with beam.Pipeline(options=pipeline_options) as p:
75 (
76 p
77 | "ReadPositions"
78 >> ReadPositionsFromKafka(
79 bootstrap_servers=known_args.bootstrap_servers,
80 topics=[known_args.input_topic],
81 group_id=f"{known_args.output_topic}-group",
82 deprecated_read=known_args.deprecated_read,
83 )
84 | "Windowing"
85 >> beam.WindowInto(
86 GlobalWindows(),
87 trigger=AfterWatermark(early=AfterProcessingTime(3)),
88 allowed_lateness=0,
89 timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
90 accumulation_mode=AccumulationMode.ACCUMULATING,
91 )
92 | "ComputeMetrics" >> ComputeMetrics()
93 | "WriteNotifications"
94 >> WriteMetricsToKafka(
95 bootstrap_servers=known_args.bootstrap_servers,
96 topic=known_args.output_topic,
97 deprecated_read=known_args.deprecated_read,
98 )
99 )
100
101 logging.getLogger().setLevel(logging.WARN)
102 logging.info("Building pipeline ...")
103
104
105if __name__ == "__main__":
106 run()
Pipeline Test
As described in this documentation, we can test a Beam pipeline as following.
- Create a
TestPipeline
. - Create some static, known test input data.
- Create a
PCollection
of input data using theCreate
transform (if bounded source) or aTestStream
(if unbounded source) - Apply the transform to the input
PCollection
and save the resulting outputPCollection
. - Use
PAssert
and its subclasses (or testing utils in Python) to verify that the outputPCollection
contains the elements that you expect.
We use test sport activities of two users that are stored in a file (input/test-tracker-data.txt
). Then, we add them into a test stream and apply the same preprocessing and windowing transforms. Finally, we apply the metric computation transform and compare the outputs with the expected outputs where the expected speed values are calculated using the same logic.
1# input/test-tracker-data.txt
2user0 109 1713939465.7636628
3user1 40 1713939465.7636628
4user0 108 1713939465.8801599
5user1 50 1713939465.8801658
6user0 115 1713939467.8904564
7user1 58 1713939467.8904696
8...
1# chapter2/sport_tracker_test.py
2import os
3import re
4import typing
5import unittest
6
7import apache_beam as beam
8from apache_beam.coders import coders
9
10from apache_beam.testing.test_pipeline import TestPipeline
11from apache_beam.testing.util import assert_that, equal_to
12from apache_beam.testing.test_stream import TestStream
13from apache_beam.transforms.trigger import (
14 AfterWatermark,
15 AccumulationMode,
16 AfterProcessingTime,
17)
18from apache_beam.transforms.window import GlobalWindows, TimestampCombiner
19from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
20
21from sport_tracker_utils import Position, PreProcessInput
22from sport_tracker import ComputeMetrics
23
24
25def read_file(filename: str, inputpath: str):
26 with open(os.path.join(inputpath, filename), "r") as f:
27 return f.readlines()
28
29
30def compute_matrics(key: str, positions: typing.List[Position]):
31 last: Position = None
32 distance = 0
33 duration = 0
34 for p in sorted(positions, key=lambda p: p.timestamp):
35 if last is not None:
36 distance += abs(int(p.spot) - int(last.spot))
37 duration += float(p.timestamp) - float(last.timestamp)
38 last = p
39 return key, distance / duration if duration > 0 else 0
40
41
42def compute_expected_metrics(lines: list):
43 ones, twos = [], []
44 for line in lines:
45 workout, spot, timestamp = tuple(re.sub("\n", "", line).split("\t"))
46 position = Position(spot, timestamp)
47 if workout == "user0":
48 ones.append(position)
49 else:
50 twos.append(position)
51 return [
52 compute_matrics(key="user0", positions=ones),
53 compute_matrics(key="user1", positions=twos),
54 ]
55
56
57class SportTrackerTest(unittest.TestCase):
58 def test_windowing_behaviour(self):
59 options = PipelineOptions()
60 options.view_as(StandardOptions).streaming = True
61 with TestPipeline(options=options) as p:
62 PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
63 lines = read_file(
64 "test-tracker-data.txt", os.path.join(PARENT_DIR, "inputs")
65 )
66 test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
67 for line in lines:
68 test_stream.add_elements([line])
69 test_stream.advance_watermark_to_infinity()
70
71 output = (
72 p
73 | test_stream
74 | "PreProcessInput" >> PreProcessInput()
75 | "Windowing"
76 >> beam.WindowInto(
77 GlobalWindows(),
78 trigger=AfterWatermark(early=AfterProcessingTime(3)),
79 allowed_lateness=0,
80 timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
81 accumulation_mode=AccumulationMode.ACCUMULATING,
82 )
83 | "ComputeMetrics" >> ComputeMetrics()
84 )
85
86 EXPECTED_OUTPUT = compute_expected_metrics(lines)
87
88 assert_that(output, equal_to(EXPECTED_OUTPUT))
89
90
91if __name__ == "__main__":
92 unittest.main()
We can execute the pipeline test as shown below.
1python chapter2/sport_tracker_test.py
2.
3----------------------------------------------------------------------
4Ran 1 test in 1.492s
5
6OK
Pipeline Execution
Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by a Kafka producer app that is discussed later and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.
1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1 host.docker.internal
We specify only a single known argument that enables to use the legacy read (--deprecated_read
) while accepting default values of the other known arguments (bootstrap_servers
, input_topic
…). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081
). Alternatively, we can use an embedded Flink cluster if we exclude that argument.
1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter2/sport_tracker.py --deprecated_read \
4 --job_name=sport_tracker --runner FlinkRunner --flink_master=localhost:8081 \
5 --streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000
On Flink UI, we see the pipeline has two tasks. The first task is performed until windowing the input elements while the second task performs up to sending the metric records into the output topic.
On Kafka UI, we can check the output message is a dictionary of user ID and speed.
Sport Tracker SQL
The main transforms of this pipeline perform
Windowing
: assigns input elements into a fixed time window of 5 seconds with the following configuration.- Disallows Late data (
allowed_lateness=0
)
- Disallows Late data (
ComputeMetrics
: (1) converts the input elements into a new custom type (Track
) so that the key (user ID) becomes present for grouping, (2) calculates the speed of a user where the distance and duration are obtained based on their max/min values only (!), and, finally, (3) returns a metrics record, which is a tuple of user ID and speed.
1# chapter2/sport_tracker_sql.py
2import os
3import argparse
4import re
5import logging
6import typing
7
8import apache_beam as beam
9from apache_beam import pvalue, coders
10from apache_beam.transforms.sql import SqlTransform
11from apache_beam.transforms.window import FixedWindows
12from apache_beam.options.pipeline_options import PipelineOptions
13from apache_beam.options.pipeline_options import SetupOptions
14
15from sport_tracker_utils import (
16 Position,
17 ReadPositionsFromKafka,
18 WriteMetricsToKafka,
19)
20
21
22class Track(typing.NamedTuple):
23 user: str
24 spot: int
25 timestamp: float
26
27
28coders.registry.register_coder(Track, coders.RowCoder)
29
30
31def to_track(element: typing.Tuple[str, Position]):
32 return Track(element[0], element[1].spot, element[1].timestamp)
33
34
35class ComputeMetrics(beam.PTransform):
36 def expand(self, pcoll: pvalue.PCollection):
37 return (
38 pcoll
39 | "ToTrack" >> beam.Map(to_track).with_output_types(Track)
40 | "Compute"
41 >> SqlTransform(
42 """
43 WITH cte AS (
44 SELECT
45 `user`,
46 MIN(`spot`) - MAX(`spot`) AS distance,
47 MIN(`timestamp`) - MAX(`timestamp`) AS duration
48 FROM PCOLLECTION
49 GROUP BY `user`
50 )
51 SELECT
52 `user`,
53 CASE WHEN duration = 0 THEN 0 ELSE distance / duration END AS speed
54 FROM cte
55 """
56 )
57 | "ToTuple"
58 >> beam.Map(lambda e: tuple(e)).with_output_types(typing.Tuple[str, float])
59 )
60
61
62def run(argv=None, save_main_session=True):
63 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
64 parser.add_argument(
65 "--bootstrap_servers",
66 default="host.docker.internal:29092",
67 help="Kafka bootstrap server addresses",
68 )
69 parser.add_argument("--input_topic", default="input-topic", help="Input topic")
70 parser.add_argument(
71 "--output_topic",
72 default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
73 help="Output topic",
74 )
75 parser.add_argument(
76 "--deprecated_read",
77 action="store_true",
78 default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
79 )
80 parser.set_defaults(deprecated_read=False)
81
82 known_args, pipeline_args = parser.parse_known_args(argv)
83
84 # # We use the save_main_session option because one or more DoFn's in this
85 # # workflow rely on global context (e.g., a module imported at module level).
86 pipeline_options = PipelineOptions(pipeline_args)
87 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
88 print(f"known args - {known_args}")
89 print(f"pipeline options - {pipeline_options.display_data()}")
90
91 with beam.Pipeline(options=pipeline_options) as p:
92 (
93 p
94 | "ReadPositions"
95 >> ReadPositionsFromKafka(
96 bootstrap_servers=known_args.bootstrap_servers,
97 topics=[known_args.input_topic],
98 group_id=f"{known_args.output_topic}-group",
99 deprecated_read=known_args.deprecated_read,
100 )
101 | "Windowing" >> beam.WindowInto(FixedWindows(5), allowed_lateness=0)
102 | "ComputeMetrics" >> ComputeMetrics()
103 | "WriteNotifications"
104 >> WriteMetricsToKafka(
105 bootstrap_servers=known_args.bootstrap_servers,
106 topic=known_args.output_topic,
107 deprecated_read=known_args.deprecated_read,
108 )
109 )
110
111 logging.getLogger().setLevel(logging.WARN)
112 logging.info("Building pipeline ...")
113
114
115if __name__ == "__main__":
116 run()
Pipeline Test
We have 8 test activity records of two users. Below shows those records after sorting by user ID and timestamp. Using the sorted records, we can easily obtain the expected outputs, which can be found in the last column.
Note that the transform by Beam SQL cannot be tested by the streaming Python direct runner because it doesn’t support cross-language pipelines. Therefore, we use the Flink runner for testing.
1# chapter2/sport_tracker_sql_test.py
2import sys
3import unittest
4
5import apache_beam as beam
6from apache_beam.coders import coders
7from apache_beam.testing.test_pipeline import TestPipeline
8from apache_beam.testing.util import assert_that, equal_to
9from apache_beam.testing.test_stream import TestStream
10from apache_beam.transforms.window import FixedWindows
11from apache_beam.options.pipeline_options import PipelineOptions
12
13from sport_tracker_utils import PreProcessInput
14from sport_tracker_sql import ComputeMetrics
15
16
17def main(out=sys.stderr, verbosity=2):
18 loader = unittest.TestLoader()
19
20 suite = loader.loadTestsFromModule(sys.modules[__name__])
21 unittest.TextTestRunner(out, verbosity=verbosity).run(suite)
22
23
24class SportTrackerTest(unittest.TestCase):
25 def test_windowing_behaviour(self):
26 pipeline_opts = {"runner": "FlinkRunner", "parallelism": 1, "streaming": True}
27 options = PipelineOptions([], **pipeline_opts)
28 with TestPipeline(options=options) as p:
29 lines = [
30 "user0\t0\t0",
31 "user1\t10\t2",
32 "user0\t5\t4",
33 "user1\t3\t3",
34 "user0\t10\t6",
35 "user1\t2\t7",
36 "user0\t4\t9",
37 "user1\t10\t9",
38 ]
39 test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
40 for line in lines:
41 test_stream.add_elements([line])
42 test_stream.advance_watermark_to_infinity()
43
44 output = (
45 p
46 | test_stream
47 | "PreProcessInput" >> PreProcessInput()
48 | "Windowing" >> beam.WindowInto(FixedWindows(5), allowed_lateness=0)
49 | "ComputeMetrics" >> ComputeMetrics()
50 )
51
52 assert_that(
53 output,
54 equal_to(
55 [("user0", 1.25), ("user1", 7.0), ("user0", 2.0), ("user1", 4.0)]
56 ),
57 )
58
59
60if __name__ == "__main__":
61 main(out=None)
The pipeline can be tested as shown below.
1python chapter2/sport_tracker_sql_test.py
2ok
3
4----------------------------------------------------------------------
5Ran 1 test in 34.487s
6
7OK
Pipeline Execution
Similar to the previous example, we use the legacy read (--deprecated_read
) while accepting default values of the other known arguments. Note that the transform by Beam SQL fails to run on a local Flink cluster. Therefore, an embedded Flink cluster is used without specifying the flink master argument.
1## start the beam pipeline with an embedded flink cluster
2python chapter2/sport_tracker_sql.py --deprecated_read \
3 --job_name=sport_tracker_sql --runner FlinkRunner \
4 --streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000
On Kafka UI, we can check the output message is a dictionary of user ID and speed.
Potential Improvements of Beam SQL for Python SDK
As discussed earlier, the first pipeline uses native transforms, and it takes individual elements to calculate the distance and duration. On the other hand, the second pipeline approximates those values by their max/min values only. This kind of approximation is misleading and there are two options that we can overcome such a limitation.
- User defined function: The Java SDK supports user defined functions that accept a custom Java scalar or aggregation function. We can use a user defined aggregation function to mimics the first pipeline using Beam SQL, but it is not supported in the Python SDK at the moment.
- Beam SQL aggregation analytics functionality (BEAM-9198): This ticket aims to implement SQL window analytics functions, and we would be able to take individual elements by using the lead (or lag) function if supported.
I consider the usage of Beam SQL would be limited unless one or all of those features are supported in the Python SDK, although it supports interesting features such as External Table and JOIN.
Comments