In Part 3, we developed a Beam pipeline that tracks sport activities of users and outputs their speeds periodically. While reporting such values is useful for users on its own, we can provide more engaging information to users if we have a pipeline that reports pacing of their activities over periods. For example, we can send a message to encourage a user to work harder if he/she has a performance goal and is underperforming for some periods. In this post, we develop a new pipeline that tracks user activities and reports pacing details by comparing short term metrics to their long term counterparts.

Development Environment

The development environment has an Apache Flink cluster, Apache Kafka cluster and gRPC server. The gRPC server was used in Part 4 to 6. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.

Manage Environment

The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.

  • ./setup/start-flink-env.sh
  • ./setup/stop-flink-env.sh

Those scripts accept four flags: -f, -k and -g to start/stop individual resources or -a to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh.

Below shows how to start resources using the start-up script. We need to launch both the Flink and Kafka clusters if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster only.

 1## start a local flink can kafka cluster
 2./setup/start-flink-env.sh -f -k
 3# start kafka...
 4# [+] Running 6/6
 5#  ⠿ Network app-network      Created                                     0.0s
 6#  ⠿ Volume "zookeeper_data"  Created                                     0.0s
 7#  ⠿ Volume "kafka_0_data"    Created                                     0.0s
 8#  ⠿ Container zookeeper      Started                                     0.3s
 9#  ⠿ Container kafka-0        Started                                     0.5s
10#  ⠿ Container kafka-ui       Started                                     0.8s
11# start flink 1.18.1...
12# Starting cluster.
13# Starting standalonesession daemon on host <hostname>.
14# Starting taskexecutor daemon on host <hostname>.
15
16## start a local kafka cluster only
17./setup/start-flink-env.sh -k
18# start kafka...
19# [+] Running 6/6
20#  ⠿ Network app-network      Created                                     0.0s
21#  ⠿ Volume "zookeeper_data"  Created                                     0.0s
22#  ⠿ Volume "kafka_0_data"    Created                                     0.0s
23#  ⠿ Container zookeeper      Started                                     0.3s
24#  ⠿ Container kafka-0        Started                                     0.5s
25#  ⠿ Container kafka-ui       Started                                     0.8s

Kafka Sport Activity Producer

A Kafka producer application is created to generate sport tracking activities of users. Those activities are tracked by user positions that have the following variables.

  • spot - An integer value that indicates where a user locates. Although we may represent a user position using a geographic coordinate, we keep it as an integer for the sake of simplicity in this post.
  • timestamp - A float value that shows the time in seconds since the Epoch when a user locates in the corresponding spot.

A configurable number of user tracks (--num_tracks, default 5) can be generated every two seconds by default (--delay_seconds). The producer sends the activity tracking records as a text by concatenating the user ID and position values with tab characters (\t).

 1# utils/sport_tracker_gen.py
 2import time
 3import argparse
 4import random
 5import typing
 6
 7from producer import TextProducer
 8
 9
10class Position(typing.NamedTuple):
11    spot: int
12    timestamp: float
13
14    @classmethod
15    def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
16        return cls(spot=spot, timestamp=timestamp)
17
18
19class TrackGenerator:
20    def __init__(self, num_tracks: int, delay_seconds: int) -> None:
21        self.num_tracks = num_tracks
22        self.delay_seconds = delay_seconds
23        self.positions = [
24            Position.create(spot=random.randint(0, 110)) for _ in range(self.num_tracks)
25        ]
26
27    def update_positions(self):
28        for ind, position in enumerate(self.positions):
29            self.positions[ind] = self.move(
30                start=position,
31                delta=random.randint(-10, 10),
32                duration=time.time() - position.timestamp,
33            )
34
35    def move(self, start: Position, delta: int, duration: float):
36        spot, timestamp = tuple(start)
37        return Position(spot=spot + delta, timestamp=timestamp + duration)
38
39    def create_tracks(self):
40        tracks = []
41        for ind, position in enumerate(self.positions):
42            track = f"user{ind}\t{position.spot}\t{position.timestamp}"
43            print(track)
44            tracks.append(track)
45        return tracks
46
47
48if __name__ == "__main__":
49    parser = argparse.ArgumentParser(__file__, description="Sport Data Generator")
50    parser.add_argument(
51        "--bootstrap_servers",
52        "-b",
53        type=str,
54        default="localhost:29092",
55        help="Comma separated string of Kafka bootstrap addresses",
56    )
57    parser.add_argument(
58        "--topic_name",
59        "-t",
60        type=str,
61        default="input-topic",
62        help="Kafka topic name",
63    )
64    parser.add_argument(
65        "--num_tracks",
66        "-n",
67        type=int,
68        default=5,
69        help="Number of tracks",
70    )
71    parser.add_argument(
72        "--delay_seconds",
73        "-d",
74        type=float,
75        default=2,
76        help="The amount of time that a record should be delayed.",
77    )
78    args = parser.parse_args()
79
80    producer = TextProducer(args.bootstrap_servers, args.topic_name)
81    track_gen = TrackGenerator(args.num_tracks, args.delay_seconds)
82
83    while True:
84        tracks = track_gen.create_tracks()
85        for track in tracks:
86            producer.send_to_kafka(text=track)
87        track_gen.update_positions()
88        time.sleep(random.randint(0, args.delay_seconds))

The producer app sends the input messages using the following Kafka producer class.

 1# utils/producer.py
 2from kafka import KafkaProducer
 3
 4
 5class TextProducer:
 6    def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
 7        self.bootstrap_servers = bootstrap_servers
 8        self.topic_name = topic_name
 9        self.kafka_producer = self.create_producer()
10
11    def create_producer(self):
12        """
13        Returns a KafkaProducer instance
14        """
15        return KafkaProducer(
16            bootstrap_servers=self.bootstrap_servers,
17            value_serializer=lambda v: v.encode("utf-8"),
18        )
19
20    def send_to_kafka(self, text: str, timestamp_ms: int = None):
21        """
22        Sends text to a Kafka topic.
23        """
24        try:
25            args = {"topic": self.topic_name, "value": text}
26            if timestamp_ms is not None:
27                args = {**args, **{"timestamp_ms": timestamp_ms}}
28            self.kafka_producer.send(**args)
29            self.kafka_producer.flush()
30        except Exception as e:
31            raise RuntimeError("fails to send a message") from e

We can execute the producer app after starting the Kafka cluster. Once executed, the activity tracking records are printed in the terminal.

 1python utils/sport_tracker_gen.py 
 2user0   78      1731565423.0311885
 3user1   13      1731565423.0311885
 4user2   108     1731565423.0311885
 5user3   64      1731565423.0311885
 6user4   92      1731565423.0311885
 7===========================
 8user0   87      1731565423.1549027
 9user1   22      1731565423.154912
10user2   116     1731565423.1549146
11user3   67      1731565423.1549163
12user4   99      1731565423.1549182
13===========================
14...

Also, we can check the input messages using Kafka UI on localhost:8080.

Beam Pipeline

We develop a new pipeline that tracks user activities and reports pacing details by comparing short term metrics to their long term counterparts.

Shared Source

  • Position / PositionCoder
    • The input text messages are converted into a custom type (Position). Therefore, we need to create its type definition and register the instruction about how to encode/decode its value using a coder (PositionCoder). Note that, without registering the coder, the custom type cannot be processed by a portable runner.
  • ComputeBoxedMetrics
    • This composite transform begins with placing activity tracking elements into a global window and pushes them into the ToMetricFn DoFn.
  • ToMetricFn
    • This is a stateful DoFn that buffers the tracking elements and flushes them periodically. Once flushed, it computes a metric record recursively and returns a tuple of the key (user ID) and metric together with the associating timestamp. The timestamp of the output tuple is taken from the first element’s timestamp.
  • Metric
    • This is a custom data type that keeps user activity metrics.
  • MeanPaceCombineFn
    • This CombineFn accumulates user activity metrics and computes an average speed.
  • ReadPositionsFromKafka
    • It reads messages from a Kafka topic, and returns tuple elements of user ID and position. We need to specify the output type hint for a portable runner to recognise the output type correctly. Note that, the Kafka read and write transforms have an argument called deprecated_read, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.
  • PreProcessInput
    • This is a composite transform that converts an input text message into a tuple of user ID and position as well as assigns a timestamp value into an individual element.
  • WriteNotificationsToKafka
    • It sends output messages to a Kafka topic. Each message is a tuple of track (user ID) and notification.
  1# chapter4/sport_tracker_utils.py
  2import re
  3import json
  4import random
  5import time
  6import typing
  7import logging
  8
  9import apache_beam as beam
 10from apache_beam.io import kafka
 11from apache_beam import pvalue
 12from apache_beam.transforms.util import Reify
 13from apache_beam.transforms.window import GlobalWindows, TimestampedValue
 14from apache_beam.transforms.timeutil import TimeDomain
 15from apache_beam.transforms.userstate import (
 16    ReadModifyWriteStateSpec,
 17    BagStateSpec,
 18    TimerSpec,
 19    on_timer,
 20)
 21from apache_beam.utils.timestamp import Timestamp
 22
 23
 24class Position(typing.NamedTuple):
 25    spot: int
 26    timestamp: float
 27
 28    def to_bytes(self):
 29        return json.dumps(self._asdict()).encode("utf-8")
 30
 31    @classmethod
 32    def from_bytes(cls, encoded: bytes):
 33        d = json.loads(encoded.decode("utf-8"))
 34        return cls(**d)
 35
 36    @classmethod
 37    def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
 38        return cls(spot=spot, timestamp=timestamp)
 39
 40
 41class PositionCoder(beam.coders.Coder):
 42    def encode(self, value: Position):
 43        return value.to_bytes()
 44
 45    def decode(self, encoded: bytes):
 46        return Position.from_bytes(encoded)
 47
 48    def is_deterministic(self) -> bool:
 49        return True
 50
 51
 52beam.coders.registry.register_coder(Position, PositionCoder)
 53
 54
 55class Metric(typing.NamedTuple):
 56    distance: float
 57    duration: int
 58
 59    def to_bytes(self):
 60        return json.dumps(self._asdict()).encode("utf-8")
 61
 62    @classmethod
 63    def from_bytes(cls, encoded: bytes):
 64        d = json.loads(encoded.decode("utf-8"))
 65        return cls(**d)
 66
 67
 68class ToMetricFn(beam.DoFn):
 69    MIN_TIMESTAMP = ReadModifyWriteStateSpec("min_timestamp", beam.coders.FloatCoder())
 70    BUFFER = BagStateSpec("buffer", PositionCoder())
 71    FLUSH_TIMER = TimerSpec("flush", TimeDomain.WATERMARK)
 72
 73    def __init__(self, verbose: bool = False):
 74        self.verbose = verbose
 75
 76    def process(
 77        self,
 78        element: typing.Tuple[str, Position],
 79        timestamp=beam.DoFn.TimestampParam,
 80        buffer=beam.DoFn.StateParam(BUFFER),
 81        min_timestamp=beam.DoFn.StateParam(MIN_TIMESTAMP),
 82        flush_timer=beam.DoFn.TimerParam(FLUSH_TIMER),
 83    ):
 84        min_ts: Timestamp = min_timestamp.read()
 85        if min_ts is None:
 86            if self.verbose and element[0] == "user0":
 87                logging.info(
 88                    f"ToMetricFn set flush timer for {element[0]} at {timestamp}"
 89                )
 90            min_timestamp.write(timestamp)
 91            flush_timer.set(timestamp)
 92        buffer.add(element[1])
 93
 94    @on_timer(FLUSH_TIMER)
 95    def flush(
 96        self,
 97        key=beam.DoFn.KeyParam,
 98        buffer=beam.DoFn.StateParam(BUFFER),
 99        min_timestamp=beam.DoFn.StateParam(MIN_TIMESTAMP),
100    ):
101        items: typing.List[Position] = []
102        for item in buffer.read():
103            items.append(item)
104            if self.verbose and key == "user0":
105                logging.info(
106                    f"ToMetricFn flush track {key}, ts {item.timestamp}, num items {len(items)}"
107                )
108
109        items = list(sorted(items, key=lambda p: p.timestamp))
110        outputs = list(self.flush_metrics(items, key))
111
112        buffer.clear()
113        buffer.add(items[-1])
114        min_timestamp.clear()
115        return outputs
116
117    def flush_metrics(self, items: typing.List[Position], key: str):
118        i = 1
119        while i < len(items):
120            last = items[i - 1]
121            next = items[i]
122            distance = abs(next.spot - last.spot)
123            duration = next.timestamp - last.timestamp
124            if duration > 0:
125                yield TimestampedValue(
126                    (key, Metric(distance, duration)),
127                    Timestamp.of(last.timestamp),
128                )
129            i += 1
130
131
132@beam.typehints.with_input_types(typing.Tuple[str, Position])
133class ComputeBoxedMetrics(beam.PTransform):
134    def __init__(self, verbose: bool = False, label: str | None = None):
135        super().__init__(label)
136        self.verbose = verbose
137
138    def expand(self, pcoll: pvalue.PCollection):
139        return (
140            pcoll
141            | beam.WindowInto(GlobalWindows())
142            | beam.ParDo(ToMetricFn(verbose=self.verbose))
143        )
144
145
146class MeanPaceCombineFn(beam.CombineFn):
147    def create_accumulator(self):
148        return Metric(0, 0)
149
150    def add_input(self, mutable_accumulator: Metric, element: Metric):
151        return Metric(*tuple(map(sum, zip(mutable_accumulator, element))))
152
153    def merge_accumulators(self, accumulators: typing.List[Metric]):
154        return Metric(*tuple(map(sum, zip(*accumulators))))
155
156    def extract_output(self, accumulator: Metric):
157        if accumulator.duration == 0:
158            return float("NaN")
159        return accumulator.distance / accumulator.duration
160
161    def get_accumulator_coder(self):
162        return beam.coders.registry.get_coder(Metric)
163
164
165class PreProcessInput(beam.PTransform):
166    def expand(self, pcoll: pvalue.PCollection):
167        def add_timestamp(element: typing.Tuple[str, Position]):
168            return TimestampedValue(element, Timestamp.of(element[1].timestamp))
169
170        def to_positions(input: str):
171            workout, spot, timestamp = tuple(re.sub("\n", "", input).split("\t"))
172            return workout, Position(spot=int(spot), timestamp=float(timestamp))
173
174        return (
175            pcoll
176            | "ToPositions" >> beam.Map(to_positions)
177            | "AddTS" >> beam.Map(add_timestamp)
178        )
179
180
181@beam.typehints.with_output_types(typing.Tuple[str, Position])
182class ReadPositionsFromKafka(beam.PTransform):
183    def __init__(
184        self,
185        bootstrap_servers: str,
186        topics: typing.List[str],
187        group_id: str,
188        deprecated_read: bool,
189        verbose: bool = False,
190        label: str | None = None,
191    ):
192        super().__init__(label)
193        self.boostrap_servers = bootstrap_servers
194        self.topics = topics
195        self.group_id = group_id
196        self.verbose = verbose
197        self.expansion_service = None
198        if deprecated_read:
199            self.expansion_service = kafka.default_io_expansion_service(
200                ["--experiments=use_deprecated_read"]
201            )
202
203    def expand(self, input: pvalue.PBegin):
204        def decode_message(kafka_kv: tuple):
205            if self.verbose:
206                print(kafka_kv)
207            return kafka_kv[1].decode("utf-8")
208
209        return (
210            input
211            | "ReadFromKafka"
212            >> kafka.ReadFromKafka(
213                consumer_config={
214                    "bootstrap.servers": self.boostrap_servers,
215                    "auto.offset.reset": "latest",
216                    # "enable.auto.commit": "true",
217                    "group.id": self.group_id,
218                },
219                topics=self.topics,
220                timestamp_policy=kafka.ReadFromKafka.create_time_policy,
221            )
222            | "DecodeMsg" >> beam.Map(decode_message)
223            | "PreProcessInput" >> PreProcessInput()
224        )
225
226
227class WriteNotificationsToKafka(beam.PTransform):
228    def __init__(
229        self,
230        bootstrap_servers: str,
231        topic: str,
232        label: str | None = None,
233    ):
234        super().__init__(label)
235        self.boostrap_servers = bootstrap_servers
236        self.topic = topic
237
238    def expand(self, pcoll: pvalue.PCollection):
239        def create_message(element: tuple):
240            msg = json.dumps({"track": element[0], "notification": element[1]})
241            print(msg)
242            return element[0].encode("utf-8"), msg.encode("utf-8")
243
244        return (
245            pcoll
246            | "CreateMessage"
247            >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
248            | "WriteToKafka"
249            >> kafka.WriteToKafka(
250                producer_config={"bootstrap.servers": self.boostrap_servers},
251                topic=self.topic,
252            )
253        )

Pipeline Source

The main transform is performed by SportTrackerMotivation. It begins with producing metrics using the ComputeBoxedMetrics transform. Then, those metrics are averaged within a short period (default to 20 seconds) as well as across a long period (default to 100 seconds). A fixed window is applied to obtain short averages while computing long averages involves more steps as illustrated below.

  1. A sliding window is applied where the size and period are the long and short periods respectively.
  2. The average in each of the sliding windows is computed.
  3. The long averages are re-windowed to a fixed window that has the same size to the short averages.
    • It makes the long averages are placed in comparable windows to the short averages.

After both the short and long averages are obtained, they are joined by the CoGroupByKey transform followed by generating pacing details of user activities. Note that we can also join the short/long averages using a side input - see this link for details.

The pipeline can be better illustrated with an example. Let say we have three positions of a user, and two metric records can be obtained recursively by comparing a position and its previous one. The short averages are computed with the metrics that belong to [20, 40) and [60, 80) windows. On the other hand, the long averages are obtained across multiple windows by including all metrics that fall in [Long Avg Window Start, Window End). Note that, as the long averages are re-windowed, joining is based on [Window Start, Window End). We end up having two matching windows and the notification values are obtained by comparing the short and long averages.

  1# chapter4/sport_tracker_motivation_co_gbk.py
  2import os
  3import argparse
  4import re
  5import typing
  6import logging
  7
  8import apache_beam as beam
  9from apache_beam import pvalue
 10from apache_beam.transforms.window import FixedWindows, SlidingWindows
 11from apache_beam.options.pipeline_options import PipelineOptions
 12from apache_beam.options.pipeline_options import SetupOptions
 13
 14from sport_tracker_utils import (
 15    ReadPositionsFromKafka,
 16    WriteNotificationsToKafka,
 17    ComputeBoxedMetrics,
 18    MeanPaceCombineFn,
 19)
 20
 21
 22class SportTrackerMotivation(beam.PTransform):
 23    def __init__(
 24        self,
 25        short_duration: int,
 26        long_duration: int,
 27        verbose: bool = False,
 28        label: str | None = None,
 29    ):
 30        super().__init__(label)
 31        self.short_duration = short_duration
 32        self.long_duration = long_duration
 33        self.verbose = verbose
 34
 35    def expand(self, pcoll: pvalue.PCollection):
 36        def as_motivations(
 37            element: typing.Tuple[
 38                str, typing.Tuple[typing.Iterable[float], typing.Iterable[float]]
 39            ],
 40        ):
 41            shorts, longs = element[1]
 42            short_avg = next(iter(shorts), None)
 43            long_avg = next(iter(longs), None)
 44            if long_avg in [None, 0] or short_avg in [None, 0]:
 45                status = None
 46            else:
 47                diff = short_avg / long_avg
 48                if diff < 0.9:
 49                    status = "underperforming"
 50                elif diff < 1.1:
 51                    status = "pacing"
 52                else:
 53                    status = "outperforming"
 54            if self.verbose and element[0] == "user0":
 55                logging.info(
 56                    f"SportTrackerMotivation track {element[0]}, short average {short_avg}, long average {long_avg}, status - {status}"
 57                )
 58            if status is None:
 59                return []
 60            return [(element[0], status)]
 61
 62        boxed = pcoll | "ComputeMetrics" >> ComputeBoxedMetrics(verbose=self.verbose)
 63        short_average = (
 64            boxed
 65            | "ShortWindow" >> beam.WindowInto(FixedWindows(self.short_duration))
 66            | "ShortAverage" >> beam.CombinePerKey(MeanPaceCombineFn())
 67        )
 68        long_average = (
 69            boxed
 70            | "LongWindow"
 71            >> beam.WindowInto(SlidingWindows(self.long_duration, self.short_duration))
 72            | "LongAverage" >> beam.CombinePerKey(MeanPaceCombineFn())
 73            | "MatchToShortWindow" >> beam.WindowInto(FixedWindows(self.short_duration))
 74        )
 75        return (
 76            (short_average, long_average)
 77            | beam.CoGroupByKey()
 78            | beam.FlatMap(as_motivations)
 79        )
 80
 81
 82def run(argv=None, save_main_session=True):
 83    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 84    parser.add_argument(
 85        "--bootstrap_servers",
 86        default="host.docker.internal:29092",
 87        help="Kafka bootstrap server addresses",
 88    )
 89    parser.add_argument("--input_topic", default="input-topic", help="Input topic")
 90    parser.add_argument(
 91        "--output_topic",
 92        default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
 93        help="Output topic",
 94    )
 95    parser.add_argument("--short_duration", default=20, type=int, help="Input topic")
 96    parser.add_argument("--long_duration", default=100, type=int, help="Input topic")
 97    parser.add_argument(
 98        "--verbose", action="store_true", default="Whether to enable log messages"
 99    )
100    parser.set_defaults(verbose=False)
101    parser.add_argument(
102        "--deprecated_read",
103        action="store_true",
104        default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
105    )
106    parser.set_defaults(deprecated_read=False)
107
108    known_args, pipeline_args = parser.parse_known_args(argv)
109
110    # # We use the save_main_session option because one or more DoFn's in this
111    # # workflow rely on global context (e.g., a module imported at module level).
112    pipeline_options = PipelineOptions(pipeline_args)
113    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
114    print(f"known args - {known_args}")
115    print(f"pipeline options - {pipeline_options.display_data()}")
116
117    with beam.Pipeline(options=pipeline_options) as p:
118        (
119            p
120            | "ReadPositions"
121            >> ReadPositionsFromKafka(
122                bootstrap_servers=known_args.bootstrap_servers,
123                topics=[known_args.input_topic],
124                group_id=f"{known_args.output_topic}-group",
125                deprecated_read=known_args.deprecated_read,
126            )
127            | "SportsTrackerMotivation"
128            >> SportTrackerMotivation(
129                short_duration=known_args.short_duration,
130                long_duration=known_args.long_duration,
131                verbose=known_args.verbose,
132            )
133            | "WriteNotifications"
134            >> WriteNotificationsToKafka(
135                bootstrap_servers=known_args.bootstrap_servers,
136                topic=known_args.output_topic,
137            )
138        )
139
140        logging.getLogger().setLevel(logging.INFO)
141        logging.info("Building pipeline ...")
142
143
144if __name__ == "__main__":
145    run()

Pipeline Test

As described in this documentation, we can test a Beam pipeline as following.

  1. Create a TestPipeline.
  2. Create some static, known test input data.
  3. Create a PCollection of input data using the Create transform (if bounded source) or a TestStream (if unbounded source)
  4. Apply the transform to the input PCollection and save the resulting output PCollection.
  5. Use PAssert and its subclasses (or testing utils in Python) to verify that the output PCollection contains the elements that you expect.

We have two test cases assuming bounded and unbounded scenarios. Each includes sport activities of two users, and we can obtain the expected outputs with the reasoning illustrated earlier.

 1# chapter4/sport_tracker_motivation_co_gbk_test.py
 2import typing
 3import unittest
 4from itertools import chain
 5
 6from apache_beam.testing.test_pipeline import TestPipeline
 7from apache_beam.testing.util import assert_that, equal_to
 8from apache_beam.testing.test_stream import TestStream
 9from apache_beam.utils.timestamp import Timestamp
10from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
11
12from sport_tracker_utils import Position
13from sport_tracker_motivation_co_gbk import SportTrackerMotivation
14
15
16class SportTrackerMotivationTest(unittest.TestCase):
17    def test_pipeline_bounded(self):
18        options = PipelineOptions()
19        with TestPipeline(options=options) as p:
20            # now = time.time()
21            now = 0
22            user0s = [
23                ("user0", Position.create(spot=0, timestamp=now + 30)),
24                ("user0", Position.create(spot=25, timestamp=now + 60)),
25                ("user0", Position.create(spot=22, timestamp=now + 75)),
26            ]
27            user1s = [
28                ("user1", Position.create(spot=0, timestamp=now + 30)),
29                ("user1", Position.create(spot=-20, timestamp=now + 60)),
30                ("user1", Position.create(spot=80, timestamp=now + 75)),
31            ]
32            inputs = chain(*zip(user0s, user1s))
33
34            test_stream = TestStream()
35            for input in inputs:
36                test_stream.add_elements([input], event_timestamp=input[1].timestamp)
37            test_stream.advance_watermark_to_infinity()
38
39            output = (
40                p
41                | test_stream.with_output_types(typing.Tuple[str, Position])
42                | SportTrackerMotivation(short_duration=20, long_duration=100)
43            )
44
45            EXPECTED_OUTPUT = [
46                ("user0", "pacing"),
47                ("user1", "pacing"),
48                ("user0", "underperforming"),
49                ("user1", "outperforming"),
50            ]
51
52            assert_that(output, equal_to(EXPECTED_OUTPUT))
53
54    def test_pipeline_unbounded(self):
55        options = PipelineOptions()
56        options.view_as(StandardOptions).streaming = True
57        with TestPipeline(options=options) as p:
58            # now = time.time()
59            now = 0
60            user0s = [
61                ("user0", Position.create(spot=0, timestamp=now + 30)),
62                ("user0", Position.create(spot=25, timestamp=now + 60)),
63                ("user0", Position.create(spot=22, timestamp=now + 75)),
64            ]
65            user1s = [
66                ("user1", Position.create(spot=0, timestamp=now + 30)),
67                ("user1", Position.create(spot=-20, timestamp=now + 60)),
68                ("user1", Position.create(spot=80, timestamp=now + 75)),
69            ]
70            inputs = chain(*zip(user0s, user1s))
71            watermarks = [now + 5, now + 10, now + 15, now + 20, now + 29, now + 30]
72
73            test_stream = TestStream()
74            test_stream.advance_watermark_to(Timestamp.of(now))
75            for input in inputs:
76                test_stream.add_elements([input], event_timestamp=input[1].timestamp)
77                if watermarks:
78                    test_stream.advance_watermark_to(Timestamp.of(watermarks.pop(0)))
79            test_stream.advance_watermark_to_infinity()
80
81            output = (
82                p
83                | test_stream.with_output_types(typing.Tuple[str, Position])
84                | SportTrackerMotivation(short_duration=30, long_duration=90)
85            )
86
87            EXPECTED_OUTPUT = [
88                ("user0", "pacing"),
89                ("user1", "pacing"),
90                ("user0", "underperforming"),
91                ("user1", "outperforming"),
92            ]
93
94            assert_that(output, equal_to(EXPECTED_OUTPUT))
95
96
97if __name__ == "__main__":
98    unittest.main()

We can execute the pipeline test as shown below.

1python chapter4/sport_tracker_motivation_co_gbk_test.py 
2..
3----------------------------------------------------------------------
4Ran 2 tests in 1.032s
5
6OK

Pipeline Execution

Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by the Kafka producer app and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.

1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1       host.docker.internal

We need to send messages into the input Kafka topic before executing the pipeline. Input messages can be sent by executing the Kafka text producer - python utils/sport_tracker_gen.py.

When executing the pipeline, we specify only a single known argument that enables to use the legacy read (--deprecated_read) while accepting default values of the other known arguments (bootstrap_servers, input_topic …). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081). Alternatively, we can use an embedded Flink cluster if we exclude that argument.

1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3## add --verbose to check detailed log messages
4python chapter4/sport_tracker_motivation_co_gbk.py --deprecated_read \
5	--job_name=sport-tracker-motivation --runner FlinkRunner --flink_master=localhost:8081 \
6	--streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000

On Flink UI, we see the pipeline has multiple tasks. Notably the tasks that compute the short and long averages are split and executed in parallel, and the outcomes are combined subsequently.

On Kafka UI, we can check the output message is a dictionary of track (user ID) and notification.