In Part 3, we developed a Beam pipeline that tracks sport activities of users and outputs their speeds periodically. While reporting such values is useful for users on its own, we can provide more engaging information to users if we have a pipeline that reports pacing of their activities over periods. For example, we can send a message to encourage a user to work harder if he/she has a performance goal and is underperforming for some periods. In this post, we develop a new pipeline that tracks user activities and reports pacing details by comparing short term metrics to their long term counterparts.
- Part 1 Calculate K Most Frequent Words and Max Word Length
- Part 2 Calculate Average Word Length with/without Fixed Look back
- Part 3 Build Sport Activity Tracker with/without SQL
- Part 4 Call RPC Service for Data Augmentation
- Part 5 Call RPC Service in Batch using Stateless DoFn
- Part 6 Call RPC Service in Batch with Defined Batch Size using Stateful DoFn
- Part 7 Separate Droppable Data into Side Output
- Part 8 Enhance Sport Activity Tracker with Runner Motivation (this post)
- Part 9 Develop Batch File Reader and PiSampler using Splittable DoFn
- Part 10 Develop Streaming File Reader using Splittable DoFn
Development Environment
The development environment has an Apache Flink cluster, Apache Kafka cluster and gRPC server. The gRPC server was used in Part 4 to 6. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.
Manage Environment
The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.
./setup/start-flink-env.sh
./setup/stop-flink-env.sh
Those scripts accept four flags: -f
, -k
and -g
to start/stop individual resources or -a
to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh
.
Below shows how to start resources using the start-up script. We need to launch both the Flink and Kafka clusters if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster only.
1## start a local flink can kafka cluster
2./setup/start-flink-env.sh -f -k
3# start kafka...
4# [+] Running 6/6
5# ⠿ Network app-network Created 0.0s
6# ⠿ Volume "zookeeper_data" Created 0.0s
7# ⠿ Volume "kafka_0_data" Created 0.0s
8# ⠿ Container zookeeper Started 0.3s
9# ⠿ Container kafka-0 Started 0.5s
10# ⠿ Container kafka-ui Started 0.8s
11# start flink 1.18.1...
12# Starting cluster.
13# Starting standalonesession daemon on host <hostname>.
14# Starting taskexecutor daemon on host <hostname>.
15
16## start a local kafka cluster only
17./setup/start-flink-env.sh -k
18# start kafka...
19# [+] Running 6/6
20# ⠿ Network app-network Created 0.0s
21# ⠿ Volume "zookeeper_data" Created 0.0s
22# ⠿ Volume "kafka_0_data" Created 0.0s
23# ⠿ Container zookeeper Started 0.3s
24# ⠿ Container kafka-0 Started 0.5s
25# ⠿ Container kafka-ui Started 0.8s
Kafka Sport Activity Producer
A Kafka producer application is created to generate sport tracking activities of users. Those activities are tracked by user positions that have the following variables.
- spot - An integer value that indicates where a user locates. Although we may represent a user position using a geographic coordinate, we keep it as an integer for the sake of simplicity in this post.
- timestamp - A float value that shows the time in seconds since the Epoch when a user locates in the corresponding spot.
A configurable number of user tracks (--num_tracks
, default 5) can be generated every two seconds by default (--delay_seconds
). The producer sends the activity tracking records as a text by concatenating the user ID and position values with tab characters (\t
).
1# utils/sport_tracker_gen.py
2import time
3import argparse
4import random
5import typing
6
7from producer import TextProducer
8
9
10class Position(typing.NamedTuple):
11 spot: int
12 timestamp: float
13
14 @classmethod
15 def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
16 return cls(spot=spot, timestamp=timestamp)
17
18
19class TrackGenerator:
20 def __init__(self, num_tracks: int, delay_seconds: int) -> None:
21 self.num_tracks = num_tracks
22 self.delay_seconds = delay_seconds
23 self.positions = [
24 Position.create(spot=random.randint(0, 110)) for _ in range(self.num_tracks)
25 ]
26
27 def update_positions(self):
28 for ind, position in enumerate(self.positions):
29 self.positions[ind] = self.move(
30 start=position,
31 delta=random.randint(-10, 10),
32 duration=time.time() - position.timestamp,
33 )
34
35 def move(self, start: Position, delta: int, duration: float):
36 spot, timestamp = tuple(start)
37 return Position(spot=spot + delta, timestamp=timestamp + duration)
38
39 def create_tracks(self):
40 tracks = []
41 for ind, position in enumerate(self.positions):
42 track = f"user{ind}\t{position.spot}\t{position.timestamp}"
43 print(track)
44 tracks.append(track)
45 return tracks
46
47
48if __name__ == "__main__":
49 parser = argparse.ArgumentParser(__file__, description="Sport Data Generator")
50 parser.add_argument(
51 "--bootstrap_servers",
52 "-b",
53 type=str,
54 default="localhost:29092",
55 help="Comma separated string of Kafka bootstrap addresses",
56 )
57 parser.add_argument(
58 "--topic_name",
59 "-t",
60 type=str,
61 default="input-topic",
62 help="Kafka topic name",
63 )
64 parser.add_argument(
65 "--num_tracks",
66 "-n",
67 type=int,
68 default=5,
69 help="Number of tracks",
70 )
71 parser.add_argument(
72 "--delay_seconds",
73 "-d",
74 type=float,
75 default=2,
76 help="The amount of time that a record should be delayed.",
77 )
78 args = parser.parse_args()
79
80 producer = TextProducer(args.bootstrap_servers, args.topic_name)
81 track_gen = TrackGenerator(args.num_tracks, args.delay_seconds)
82
83 while True:
84 tracks = track_gen.create_tracks()
85 for track in tracks:
86 producer.send_to_kafka(text=track)
87 track_gen.update_positions()
88 time.sleep(random.randint(0, args.delay_seconds))
The producer app sends the input messages using the following Kafka producer class.
1# utils/producer.py
2from kafka import KafkaProducer
3
4
5class TextProducer:
6 def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
7 self.bootstrap_servers = bootstrap_servers
8 self.topic_name = topic_name
9 self.kafka_producer = self.create_producer()
10
11 def create_producer(self):
12 """
13 Returns a KafkaProducer instance
14 """
15 return KafkaProducer(
16 bootstrap_servers=self.bootstrap_servers,
17 value_serializer=lambda v: v.encode("utf-8"),
18 )
19
20 def send_to_kafka(self, text: str, timestamp_ms: int = None):
21 """
22 Sends text to a Kafka topic.
23 """
24 try:
25 args = {"topic": self.topic_name, "value": text}
26 if timestamp_ms is not None:
27 args = {**args, **{"timestamp_ms": timestamp_ms}}
28 self.kafka_producer.send(**args)
29 self.kafka_producer.flush()
30 except Exception as e:
31 raise RuntimeError("fails to send a message") from e
We can execute the producer app after starting the Kafka cluster. Once executed, the activity tracking records are printed in the terminal.
1python utils/sport_tracker_gen.py
2user0 78 1731565423.0311885
3user1 13 1731565423.0311885
4user2 108 1731565423.0311885
5user3 64 1731565423.0311885
6user4 92 1731565423.0311885
7===========================
8user0 87 1731565423.1549027
9user1 22 1731565423.154912
10user2 116 1731565423.1549146
11user3 67 1731565423.1549163
12user4 99 1731565423.1549182
13===========================
14...
Also, we can check the input messages using Kafka UI on localhost:8080.
Beam Pipeline
We develop a new pipeline that tracks user activities and reports pacing details by comparing short term metrics to their long term counterparts.
Shared Source
Position
/PositionCoder
- The input text messages are converted into a custom type (
Position
). Therefore, we need to create its type definition and register the instruction about how to encode/decode its value using a coder (PositionCoder
). Note that, without registering the coder, the custom type cannot be processed by a portable runner.
- The input text messages are converted into a custom type (
ComputeBoxedMetrics
- This composite transform begins with placing activity tracking elements into a global window and pushes them into the
ToMetricFn
DoFn.
- This composite transform begins with placing activity tracking elements into a global window and pushes them into the
ToMetricFn
- This is a stateful DoFn that buffers the tracking elements and flushes them periodically. Once flushed, it computes a metric record recursively and returns a tuple of the key (user ID) and metric together with the associating timestamp. The timestamp of the output tuple is taken from the first element’s timestamp.
Metric
- This is a custom data type that keeps user activity metrics.
MeanPaceCombineFn
- This CombineFn accumulates user activity metrics and computes an average speed.
ReadPositionsFromKafka
- It reads messages from a Kafka topic, and returns tuple elements of user ID and position. We need to specify the output type hint for a portable runner to recognise the output type correctly. Note that, the Kafka read and write transforms have an argument called
deprecated_read
, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.
- It reads messages from a Kafka topic, and returns tuple elements of user ID and position. We need to specify the output type hint for a portable runner to recognise the output type correctly. Note that, the Kafka read and write transforms have an argument called
PreProcessInput
- This is a composite transform that converts an input text message into a tuple of user ID and position as well as assigns a timestamp value into an individual element.
WriteNotificationsToKafka
- It sends output messages to a Kafka topic. Each message is a tuple of track (user ID) and notification.
1# chapter4/sport_tracker_utils.py
2import re
3import json
4import random
5import time
6import typing
7import logging
8
9import apache_beam as beam
10from apache_beam.io import kafka
11from apache_beam import pvalue
12from apache_beam.transforms.util import Reify
13from apache_beam.transforms.window import GlobalWindows, TimestampedValue
14from apache_beam.transforms.timeutil import TimeDomain
15from apache_beam.transforms.userstate import (
16 ReadModifyWriteStateSpec,
17 BagStateSpec,
18 TimerSpec,
19 on_timer,
20)
21from apache_beam.utils.timestamp import Timestamp
22
23
24class Position(typing.NamedTuple):
25 spot: int
26 timestamp: float
27
28 def to_bytes(self):
29 return json.dumps(self._asdict()).encode("utf-8")
30
31 @classmethod
32 def from_bytes(cls, encoded: bytes):
33 d = json.loads(encoded.decode("utf-8"))
34 return cls(**d)
35
36 @classmethod
37 def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
38 return cls(spot=spot, timestamp=timestamp)
39
40
41class PositionCoder(beam.coders.Coder):
42 def encode(self, value: Position):
43 return value.to_bytes()
44
45 def decode(self, encoded: bytes):
46 return Position.from_bytes(encoded)
47
48 def is_deterministic(self) -> bool:
49 return True
50
51
52beam.coders.registry.register_coder(Position, PositionCoder)
53
54
55class Metric(typing.NamedTuple):
56 distance: float
57 duration: int
58
59 def to_bytes(self):
60 return json.dumps(self._asdict()).encode("utf-8")
61
62 @classmethod
63 def from_bytes(cls, encoded: bytes):
64 d = json.loads(encoded.decode("utf-8"))
65 return cls(**d)
66
67
68class ToMetricFn(beam.DoFn):
69 MIN_TIMESTAMP = ReadModifyWriteStateSpec("min_timestamp", beam.coders.FloatCoder())
70 BUFFER = BagStateSpec("buffer", PositionCoder())
71 FLUSH_TIMER = TimerSpec("flush", TimeDomain.WATERMARK)
72
73 def __init__(self, verbose: bool = False):
74 self.verbose = verbose
75
76 def process(
77 self,
78 element: typing.Tuple[str, Position],
79 timestamp=beam.DoFn.TimestampParam,
80 buffer=beam.DoFn.StateParam(BUFFER),
81 min_timestamp=beam.DoFn.StateParam(MIN_TIMESTAMP),
82 flush_timer=beam.DoFn.TimerParam(FLUSH_TIMER),
83 ):
84 min_ts: Timestamp = min_timestamp.read()
85 if min_ts is None:
86 if self.verbose and element[0] == "user0":
87 logging.info(
88 f"ToMetricFn set flush timer for {element[0]} at {timestamp}"
89 )
90 min_timestamp.write(timestamp)
91 flush_timer.set(timestamp)
92 buffer.add(element[1])
93
94 @on_timer(FLUSH_TIMER)
95 def flush(
96 self,
97 key=beam.DoFn.KeyParam,
98 buffer=beam.DoFn.StateParam(BUFFER),
99 min_timestamp=beam.DoFn.StateParam(MIN_TIMESTAMP),
100 ):
101 items: typing.List[Position] = []
102 for item in buffer.read():
103 items.append(item)
104 if self.verbose and key == "user0":
105 logging.info(
106 f"ToMetricFn flush track {key}, ts {item.timestamp}, num items {len(items)}"
107 )
108
109 items = list(sorted(items, key=lambda p: p.timestamp))
110 outputs = list(self.flush_metrics(items, key))
111
112 buffer.clear()
113 buffer.add(items[-1])
114 min_timestamp.clear()
115 return outputs
116
117 def flush_metrics(self, items: typing.List[Position], key: str):
118 i = 1
119 while i < len(items):
120 last = items[i - 1]
121 next = items[i]
122 distance = abs(next.spot - last.spot)
123 duration = next.timestamp - last.timestamp
124 if duration > 0:
125 yield TimestampedValue(
126 (key, Metric(distance, duration)),
127 Timestamp.of(last.timestamp),
128 )
129 i += 1
130
131
132@beam.typehints.with_input_types(typing.Tuple[str, Position])
133class ComputeBoxedMetrics(beam.PTransform):
134 def __init__(self, verbose: bool = False, label: str | None = None):
135 super().__init__(label)
136 self.verbose = verbose
137
138 def expand(self, pcoll: pvalue.PCollection):
139 return (
140 pcoll
141 | beam.WindowInto(GlobalWindows())
142 | beam.ParDo(ToMetricFn(verbose=self.verbose))
143 )
144
145
146class MeanPaceCombineFn(beam.CombineFn):
147 def create_accumulator(self):
148 return Metric(0, 0)
149
150 def add_input(self, mutable_accumulator: Metric, element: Metric):
151 return Metric(*tuple(map(sum, zip(mutable_accumulator, element))))
152
153 def merge_accumulators(self, accumulators: typing.List[Metric]):
154 return Metric(*tuple(map(sum, zip(*accumulators))))
155
156 def extract_output(self, accumulator: Metric):
157 if accumulator.duration == 0:
158 return float("NaN")
159 return accumulator.distance / accumulator.duration
160
161 def get_accumulator_coder(self):
162 return beam.coders.registry.get_coder(Metric)
163
164
165class PreProcessInput(beam.PTransform):
166 def expand(self, pcoll: pvalue.PCollection):
167 def add_timestamp(element: typing.Tuple[str, Position]):
168 return TimestampedValue(element, Timestamp.of(element[1].timestamp))
169
170 def to_positions(input: str):
171 workout, spot, timestamp = tuple(re.sub("\n", "", input).split("\t"))
172 return workout, Position(spot=int(spot), timestamp=float(timestamp))
173
174 return (
175 pcoll
176 | "ToPositions" >> beam.Map(to_positions)
177 | "AddTS" >> beam.Map(add_timestamp)
178 )
179
180
181@beam.typehints.with_output_types(typing.Tuple[str, Position])
182class ReadPositionsFromKafka(beam.PTransform):
183 def __init__(
184 self,
185 bootstrap_servers: str,
186 topics: typing.List[str],
187 group_id: str,
188 deprecated_read: bool,
189 verbose: bool = False,
190 label: str | None = None,
191 ):
192 super().__init__(label)
193 self.boostrap_servers = bootstrap_servers
194 self.topics = topics
195 self.group_id = group_id
196 self.verbose = verbose
197 self.expansion_service = None
198 if deprecated_read:
199 self.expansion_service = kafka.default_io_expansion_service(
200 ["--experiments=use_deprecated_read"]
201 )
202
203 def expand(self, input: pvalue.PBegin):
204 def decode_message(kafka_kv: tuple):
205 if self.verbose:
206 print(kafka_kv)
207 return kafka_kv[1].decode("utf-8")
208
209 return (
210 input
211 | "ReadFromKafka"
212 >> kafka.ReadFromKafka(
213 consumer_config={
214 "bootstrap.servers": self.boostrap_servers,
215 "auto.offset.reset": "latest",
216 # "enable.auto.commit": "true",
217 "group.id": self.group_id,
218 },
219 topics=self.topics,
220 timestamp_policy=kafka.ReadFromKafka.create_time_policy,
221 )
222 | "DecodeMsg" >> beam.Map(decode_message)
223 | "PreProcessInput" >> PreProcessInput()
224 )
225
226
227class WriteNotificationsToKafka(beam.PTransform):
228 def __init__(
229 self,
230 bootstrap_servers: str,
231 topic: str,
232 label: str | None = None,
233 ):
234 super().__init__(label)
235 self.boostrap_servers = bootstrap_servers
236 self.topic = topic
237
238 def expand(self, pcoll: pvalue.PCollection):
239 def create_message(element: tuple):
240 msg = json.dumps({"track": element[0], "notification": element[1]})
241 print(msg)
242 return element[0].encode("utf-8"), msg.encode("utf-8")
243
244 return (
245 pcoll
246 | "CreateMessage"
247 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
248 | "WriteToKafka"
249 >> kafka.WriteToKafka(
250 producer_config={"bootstrap.servers": self.boostrap_servers},
251 topic=self.topic,
252 )
253 )
Pipeline Source
The main transform is performed by SportTrackerMotivation
. It begins with producing metrics using the ComputeBoxedMetrics
transform. Then, those metrics are averaged within a short period (default to 20 seconds) as well as across a long period (default to 100 seconds). A fixed window is applied to obtain short averages while computing long averages involves more steps as illustrated below.
- A sliding window is applied where the size and period are the long and short periods respectively.
- The average in each of the sliding windows is computed.
- The long averages are re-windowed to a fixed window that has the same size to the short averages.
- It makes the long averages are placed in comparable windows to the short averages.
After both the short and long averages are obtained, they are joined by the CoGroupByKey
transform followed by generating pacing details of user activities. Note that we can also join the short/long averages using a side input - see this link for details.
The pipeline can be better illustrated with an example. Let say we have three positions of a user, and two metric records can be obtained recursively by comparing a position and its previous one. The short averages are computed with the metrics that belong to [20, 40)
and [60, 80)
windows. On the other hand, the long averages are obtained across multiple windows by including all metrics that fall in [Long Avg Window Start, Window End)
. Note that, as the long averages are re-windowed, joining is based on [Window Start, Window End)
. We end up having two matching windows and the notification values are obtained by comparing the short and long averages.
1# chapter4/sport_tracker_motivation_co_gbk.py
2import os
3import argparse
4import re
5import typing
6import logging
7
8import apache_beam as beam
9from apache_beam import pvalue
10from apache_beam.transforms.window import FixedWindows, SlidingWindows
11from apache_beam.options.pipeline_options import PipelineOptions
12from apache_beam.options.pipeline_options import SetupOptions
13
14from sport_tracker_utils import (
15 ReadPositionsFromKafka,
16 WriteNotificationsToKafka,
17 ComputeBoxedMetrics,
18 MeanPaceCombineFn,
19)
20
21
22class SportTrackerMotivation(beam.PTransform):
23 def __init__(
24 self,
25 short_duration: int,
26 long_duration: int,
27 verbose: bool = False,
28 label: str | None = None,
29 ):
30 super().__init__(label)
31 self.short_duration = short_duration
32 self.long_duration = long_duration
33 self.verbose = verbose
34
35 def expand(self, pcoll: pvalue.PCollection):
36 def as_motivations(
37 element: typing.Tuple[
38 str, typing.Tuple[typing.Iterable[float], typing.Iterable[float]]
39 ],
40 ):
41 shorts, longs = element[1]
42 short_avg = next(iter(shorts), None)
43 long_avg = next(iter(longs), None)
44 if long_avg in [None, 0] or short_avg in [None, 0]:
45 status = None
46 else:
47 diff = short_avg / long_avg
48 if diff < 0.9:
49 status = "underperforming"
50 elif diff < 1.1:
51 status = "pacing"
52 else:
53 status = "outperforming"
54 if self.verbose and element[0] == "user0":
55 logging.info(
56 f"SportTrackerMotivation track {element[0]}, short average {short_avg}, long average {long_avg}, status - {status}"
57 )
58 if status is None:
59 return []
60 return [(element[0], status)]
61
62 boxed = pcoll | "ComputeMetrics" >> ComputeBoxedMetrics(verbose=self.verbose)
63 short_average = (
64 boxed
65 | "ShortWindow" >> beam.WindowInto(FixedWindows(self.short_duration))
66 | "ShortAverage" >> beam.CombinePerKey(MeanPaceCombineFn())
67 )
68 long_average = (
69 boxed
70 | "LongWindow"
71 >> beam.WindowInto(SlidingWindows(self.long_duration, self.short_duration))
72 | "LongAverage" >> beam.CombinePerKey(MeanPaceCombineFn())
73 | "MatchToShortWindow" >> beam.WindowInto(FixedWindows(self.short_duration))
74 )
75 return (
76 (short_average, long_average)
77 | beam.CoGroupByKey()
78 | beam.FlatMap(as_motivations)
79 )
80
81
82def run(argv=None, save_main_session=True):
83 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
84 parser.add_argument(
85 "--bootstrap_servers",
86 default="host.docker.internal:29092",
87 help="Kafka bootstrap server addresses",
88 )
89 parser.add_argument("--input_topic", default="input-topic", help="Input topic")
90 parser.add_argument(
91 "--output_topic",
92 default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
93 help="Output topic",
94 )
95 parser.add_argument("--short_duration", default=20, type=int, help="Input topic")
96 parser.add_argument("--long_duration", default=100, type=int, help="Input topic")
97 parser.add_argument(
98 "--verbose", action="store_true", default="Whether to enable log messages"
99 )
100 parser.set_defaults(verbose=False)
101 parser.add_argument(
102 "--deprecated_read",
103 action="store_true",
104 default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
105 )
106 parser.set_defaults(deprecated_read=False)
107
108 known_args, pipeline_args = parser.parse_known_args(argv)
109
110 # # We use the save_main_session option because one or more DoFn's in this
111 # # workflow rely on global context (e.g., a module imported at module level).
112 pipeline_options = PipelineOptions(pipeline_args)
113 pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
114 print(f"known args - {known_args}")
115 print(f"pipeline options - {pipeline_options.display_data()}")
116
117 with beam.Pipeline(options=pipeline_options) as p:
118 (
119 p
120 | "ReadPositions"
121 >> ReadPositionsFromKafka(
122 bootstrap_servers=known_args.bootstrap_servers,
123 topics=[known_args.input_topic],
124 group_id=f"{known_args.output_topic}-group",
125 deprecated_read=known_args.deprecated_read,
126 )
127 | "SportsTrackerMotivation"
128 >> SportTrackerMotivation(
129 short_duration=known_args.short_duration,
130 long_duration=known_args.long_duration,
131 verbose=known_args.verbose,
132 )
133 | "WriteNotifications"
134 >> WriteNotificationsToKafka(
135 bootstrap_servers=known_args.bootstrap_servers,
136 topic=known_args.output_topic,
137 )
138 )
139
140 logging.getLogger().setLevel(logging.INFO)
141 logging.info("Building pipeline ...")
142
143
144if __name__ == "__main__":
145 run()
Pipeline Test
As described in this documentation, we can test a Beam pipeline as following.
- Create a
TestPipeline
. - Create some static, known test input data.
- Create a
PCollection
of input data using theCreate
transform (if bounded source) or aTestStream
(if unbounded source) - Apply the transform to the input
PCollection
and save the resulting outputPCollection
. - Use
PAssert
and its subclasses (or testing utils in Python) to verify that the outputPCollection
contains the elements that you expect.
We have two test cases assuming bounded and unbounded scenarios. Each includes sport activities of two users, and we can obtain the expected outputs with the reasoning illustrated earlier.
1# chapter4/sport_tracker_motivation_co_gbk_test.py
2import typing
3import unittest
4from itertools import chain
5
6from apache_beam.testing.test_pipeline import TestPipeline
7from apache_beam.testing.util import assert_that, equal_to
8from apache_beam.testing.test_stream import TestStream
9from apache_beam.utils.timestamp import Timestamp
10from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
11
12from sport_tracker_utils import Position
13from sport_tracker_motivation_co_gbk import SportTrackerMotivation
14
15
16class SportTrackerMotivationTest(unittest.TestCase):
17 def test_pipeline_bounded(self):
18 options = PipelineOptions()
19 with TestPipeline(options=options) as p:
20 # now = time.time()
21 now = 0
22 user0s = [
23 ("user0", Position.create(spot=0, timestamp=now + 30)),
24 ("user0", Position.create(spot=25, timestamp=now + 60)),
25 ("user0", Position.create(spot=22, timestamp=now + 75)),
26 ]
27 user1s = [
28 ("user1", Position.create(spot=0, timestamp=now + 30)),
29 ("user1", Position.create(spot=-20, timestamp=now + 60)),
30 ("user1", Position.create(spot=80, timestamp=now + 75)),
31 ]
32 inputs = chain(*zip(user0s, user1s))
33
34 test_stream = TestStream()
35 for input in inputs:
36 test_stream.add_elements([input], event_timestamp=input[1].timestamp)
37 test_stream.advance_watermark_to_infinity()
38
39 output = (
40 p
41 | test_stream.with_output_types(typing.Tuple[str, Position])
42 | SportTrackerMotivation(short_duration=20, long_duration=100)
43 )
44
45 EXPECTED_OUTPUT = [
46 ("user0", "pacing"),
47 ("user1", "pacing"),
48 ("user0", "underperforming"),
49 ("user1", "outperforming"),
50 ]
51
52 assert_that(output, equal_to(EXPECTED_OUTPUT))
53
54 def test_pipeline_unbounded(self):
55 options = PipelineOptions()
56 options.view_as(StandardOptions).streaming = True
57 with TestPipeline(options=options) as p:
58 # now = time.time()
59 now = 0
60 user0s = [
61 ("user0", Position.create(spot=0, timestamp=now + 30)),
62 ("user0", Position.create(spot=25, timestamp=now + 60)),
63 ("user0", Position.create(spot=22, timestamp=now + 75)),
64 ]
65 user1s = [
66 ("user1", Position.create(spot=0, timestamp=now + 30)),
67 ("user1", Position.create(spot=-20, timestamp=now + 60)),
68 ("user1", Position.create(spot=80, timestamp=now + 75)),
69 ]
70 inputs = chain(*zip(user0s, user1s))
71 watermarks = [now + 5, now + 10, now + 15, now + 20, now + 29, now + 30]
72
73 test_stream = TestStream()
74 test_stream.advance_watermark_to(Timestamp.of(now))
75 for input in inputs:
76 test_stream.add_elements([input], event_timestamp=input[1].timestamp)
77 if watermarks:
78 test_stream.advance_watermark_to(Timestamp.of(watermarks.pop(0)))
79 test_stream.advance_watermark_to_infinity()
80
81 output = (
82 p
83 | test_stream.with_output_types(typing.Tuple[str, Position])
84 | SportTrackerMotivation(short_duration=30, long_duration=90)
85 )
86
87 EXPECTED_OUTPUT = [
88 ("user0", "pacing"),
89 ("user1", "pacing"),
90 ("user0", "underperforming"),
91 ("user1", "outperforming"),
92 ]
93
94 assert_that(output, equal_to(EXPECTED_OUTPUT))
95
96
97if __name__ == "__main__":
98 unittest.main()
We can execute the pipeline test as shown below.
1python chapter4/sport_tracker_motivation_co_gbk_test.py
2..
3----------------------------------------------------------------------
4Ran 2 tests in 1.032s
5
6OK
Pipeline Execution
Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by the Kafka producer app and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.
1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1 host.docker.internal
We need to send messages into the input Kafka topic before executing the pipeline. Input messages can be sent by executing the Kafka text producer - python utils/sport_tracker_gen.py
.
When executing the pipeline, we specify only a single known argument that enables to use the legacy read (--deprecated_read
) while accepting default values of the other known arguments (bootstrap_servers
, input_topic
…). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081
). Alternatively, we can use an embedded Flink cluster if we exclude that argument.
1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3## add --verbose to check detailed log messages
4python chapter4/sport_tracker_motivation_co_gbk.py --deprecated_read \
5 --job_name=sport-tracker-motivation --runner FlinkRunner --flink_master=localhost:8081 \
6 --streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000
On Flink UI, we see the pipeline has multiple tasks. Notably the tasks that compute the short and long averages are split and executed in parallel, and the outcomes are combined subsequently.
On Kafka UI, we can check the output message is a dictionary of track (user ID) and notification.
Comments