In this post, we develop two Apache Beam pipelines that track sport activities of users and output their speed periodically. The first pipeline uses native transforms and Beam SQL is used for the latter. While Beam SQL can be useful in some situations, its features in the Python SDK are not complete compared to the Java SDK. Therefore, we are not able to build the required tracking pipeline using it. We end up discussing potential improvements of Beam SQL so that it can be used for building competitive applications with the Python SDK.

Development Environment

The development environment has an Apache Flink cluster and Apache Kafka cluster and gRPC server. The gRPC server will be used in Part 4 to 6. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.

Manage Environment

The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.

  • ./setup/start-flink-env.sh
  • ./setup/stop-flink-env.sh

Those scripts accept four flags: -f, -k and -g to start/stop individual resources or -a to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh.

Below shows how to start resources using the start-up script. We need to launch both the Flink and Kafka clusters if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster only.

 1## start a local flink can kafka cluster
 2./setup/start-flink-env.sh -f -k
 3# start kafka...
 4# [+] Running 6/6
 5#  ⠿ Network app-network      Created                                     0.0s
 6#  ⠿ Volume "zookeeper_data"  Created                                     0.0s
 7#  ⠿ Volume "kafka_0_data"    Created                                     0.0s
 8#  ⠿ Container zookeeper      Started                                     0.3s
 9#  ⠿ Container kafka-0        Started                                     0.5s
10#  ⠿ Container kafka-ui       Started                                     0.8s
11# start flink 1.18.1...
12# Starting cluster.
13# Starting standalonesession daemon on host <hostname>.
14# Starting taskexecutor daemon on host <hostname>.
15
16## start a local kafka cluster only
17./setup/start-flink-env.sh -k
18# start kafka...
19# [+] Running 6/6
20#  ⠿ Network app-network      Created                                     0.0s
21#  ⠿ Volume "zookeeper_data"  Created                                     0.0s
22#  ⠿ Volume "kafka_0_data"    Created                                     0.0s
23#  ⠿ Container zookeeper      Started                                     0.3s
24#  ⠿ Container kafka-0        Started                                     0.5s
25#  ⠿ Container kafka-ui       Started                                     0.8s

Kafka Sport Activity Producer

A Kafka producer application is created to generate sport tracking activities of users. Those activities are tracked by user positions that have the following variables.

  • spot - An integer value that indicates where a user locates. Although we may represent a user position using a geographic coordinate, we keep it as an integer for the sake of simplicity in this post.
  • timestamp - A float value that shows the time in seconds since the Epoch when a user locates in the corresponding spot.

A configurable number of user tracks (--num_tracks, default 5) can be generated every two seconds by default (--delay_seconds). The producer sends the activity tracking records as a text by concatenating the user ID and position values with tab characters (\t).

 1# utils/sport_tracker_gen.py
 2import time
 3import argparse
 4import random
 5import typing
 6
 7from producer import TextProducer
 8
 9
10class Position(typing.NamedTuple):
11    spot: int
12    timestamp: float
13
14    @classmethod
15    def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
16        return cls(spot=spot, timestamp=timestamp)
17
18
19class TrackGenerator:
20    def __init__(self, num_tracks: int, delay_seconds: int) -> None:
21        self.num_tracks = num_tracks
22        self.delay_seconds = delay_seconds
23        self.positions = [
24            Position.create(spot=random.randint(0, 110)) for _ in range(self.num_tracks)
25        ]
26
27    def update_positions(self):
28        for ind, position in enumerate(self.positions):
29            self.positions[ind] = self.move(
30                start=position,
31                delta=random.randint(-10, 10),
32                duration=time.time() - position.timestamp,
33            )
34
35    def move(self, start: Position, delta: int, duration: float):
36        spot, timestamp = tuple(start)
37        return Position(spot=spot + delta, timestamp=timestamp + duration)
38
39    def create_tracks(self):
40        tracks = []
41        for ind, position in enumerate(self.positions):
42            track = f"user{ind}\t{position.spot}\t{position.timestamp}"
43            print(track)
44            tracks.append(track)
45        return tracks
46
47
48if __name__ == "__main__":
49    parser = argparse.ArgumentParser(__file__, description="Sport Data Generator")
50    parser.add_argument(
51        "--bootstrap_servers",
52        "-b",
53        type=str,
54        default="localhost:29092",
55        help="Comma separated string of Kafka bootstrap addresses",
56    )
57    parser.add_argument(
58        "--topic_name",
59        "-t",
60        type=str,
61        default="input-topic",
62        help="Kafka topic name",
63    )
64    parser.add_argument(
65        "--num_tracks",
66        "-n",
67        type=int,
68        default=5,
69        help="Number of tracks",
70    )
71    parser.add_argument(
72        "--delay_seconds",
73        "-d",
74        type=float,
75        default=2,
76        help="The amount of time that a record should be delayed.",
77    )
78    args = parser.parse_args()
79
80    producer = TextProducer(args.bootstrap_servers, args.topic_name)
81    track_gen = TrackGenerator(args.num_tracks, args.delay_seconds)
82
83    while True:
84        tracks = track_gen.create_tracks()
85        for track in tracks:
86            producer.send_to_kafka(text=track)
87        track_gen.update_positions()
88        time.sleep(random.randint(0, args.delay_seconds))

The producer app sends the input messages using the following Kafka producer class.

 1# utils/producer.py
 2from kafka import KafkaProducer
 3
 4
 5class TextProducer:
 6    def __init__(self, bootstrap_servers: list, topic_name: str) -> None:
 7        self.bootstrap_servers = bootstrap_servers
 8        self.topic_name = topic_name
 9        self.kafka_producer = self.create_producer()
10
11    def create_producer(self):
12        """
13        Returns a KafkaProducer instance
14        """
15        return KafkaProducer(
16            bootstrap_servers=self.bootstrap_servers,
17            value_serializer=lambda v: v.encode("utf-8"),
18        )
19
20    def send_to_kafka(self, text: str, timestamp_ms: int = None):
21        """
22        Sends text to a Kafka topic.
23        """
24        try:
25            args = {"topic": self.topic_name, "value": text}
26            if timestamp_ms is not None:
27                args = {**args, **{"timestamp_ms": timestamp_ms}}
28            self.kafka_producer.send(**args)
29            self.kafka_producer.flush()
30        except Exception as e:
31            raise RuntimeError("fails to send a message") from e

We can execute the producer app after starting the Kafka cluster. Once executed, the activity tracking records are printed in the terminal.

 1python utils/sport_tracker_gen.py 
 2user0   97      1722127107.0654943
 3user1   56      1722127107.0654943
 4user2   55      1722127107.0654943
 5user3   55      1722127107.0654943
 6user4   95      1722127107.0654943
 7===========================
 8user0   88      1722127107.1854753
 9user1   49      1722127107.1854813
10user2   55      1722127107.1854827
11user3   61      1722127107.185484
12user4   88      1722127107.1854854
13===========================
14...

Also, we can check the input messages using Kafka UI on localhost:8080.

Beam Pipelines

We develop two Beam pipelines that track sport activities of users. The first pipeline uses native transforms and Beam SQL is used for the latter.

Shared Source

Both the pipelines share the same sources, and they are refactored in a separate module.

  • Position / PositionCoder
    • The input text messages are converted into a custom type (Position). Therefore, we need to create its type definition and register the instruction about how to encode/decode its value using a coder (PositionCoder). Note that, without registering the coder, the custom type cannot be processed by a portable runner.
  • PreProcessInput
    • This is a composite transform that converts an input text message into a tuple of user ID and position as well as assigns a timestamp value into an individual element.
  • ReadPositionsFromKafka
    • It reads messages from a Kafka topic, and returns tuple elements of user ID and position. We need to specify the output type hint for a portable runner to recognise the output type correctly. Note that, the Kafka read and write transforms have an argument called deprecated_read, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.
  • WriteMetricsToKafka
    • It sends output messages to a Kafka topic. Each message is a tuple of user ID and speed. Note that the input type hint is necessary when the inputs are piped from a transform by Beam SQL.
  1# chapter2/sport_tracker_utils.py
  2import re
  3import json
  4import random
  5import time
  6import typing
  7
  8import apache_beam as beam
  9from apache_beam.io import kafka
 10from apache_beam import pvalue
 11from apache_beam.transforms.window import TimestampedValue
 12from apache_beam.utils.timestamp import Timestamp
 13
 14
 15class Position(typing.NamedTuple):
 16    spot: int
 17    timestamp: float
 18
 19    def to_bytes(self):
 20        return json.dumps(self._asdict()).encode("utf-8")
 21
 22    @classmethod
 23    def from_bytes(cls, encoded: bytes):
 24        d = json.loads(encoded.decode("utf-8"))
 25        return cls(**d)
 26
 27    @classmethod
 28    def create(cls, spot: int = random.randint(0, 100), timestamp: float = time.time()):
 29        return cls(spot=spot, timestamp=timestamp)
 30
 31
 32class PositionCoder(beam.coders.Coder):
 33    def encode(self, value: Position):
 34        return value.to_bytes()
 35
 36    def decode(self, encoded: bytes):
 37        return Position.from_bytes(encoded)
 38
 39    def is_deterministic(self) -> bool:
 40        return True
 41
 42
 43beam.coders.registry.register_coder(Position, PositionCoder)
 44
 45
 46def add_timestamp(element: typing.Tuple[str, Position]):
 47    return TimestampedValue(element, Timestamp.of(element[1].timestamp))
 48
 49
 50def to_positions(input: str):
 51    workout, spot, timestamp = tuple(re.sub("\n", "", input).split("\t"))
 52    return workout, Position(spot=int(spot), timestamp=float(timestamp))
 53
 54
 55class PreProcessInput(beam.PTransform):
 56    def expand(self, pcoll: pvalue.PCollection):
 57        return (
 58            pcoll
 59            | "ToPositions" >> beam.Map(to_positions)
 60            | "AddTS" >> beam.Map(add_timestamp)
 61        )
 62
 63
 64def decode_message(kafka_kv: tuple):
 65    print(kafka_kv)
 66    return kafka_kv[1].decode("utf-8")
 67
 68
 69@beam.typehints.with_output_types(typing.Tuple[str, Position])
 70class ReadPositionsFromKafka(beam.PTransform):
 71    def __init__(
 72        self,
 73        bootstrap_servers: str,
 74        topics: typing.List[str],
 75        group_id: str,
 76        deprecated_read: bool,
 77        verbose: bool = False,
 78        label: str | None = None,
 79    ):
 80        super().__init__(label)
 81        self.boostrap_servers = bootstrap_servers
 82        self.topics = topics
 83        self.group_id = group_id
 84        self.verbose = verbose
 85        self.expansion_service = None
 86        if deprecated_read:
 87            self.expansion_service = kafka.default_io_expansion_service(
 88                ["--experiments=use_deprecated_read"]
 89            )
 90
 91    def expand(self, input: pvalue.PBegin):
 92        return (
 93            input
 94            | "ReadFromKafka"
 95            >> kafka.ReadFromKafka(
 96                consumer_config={
 97                    "bootstrap.servers": self.boostrap_servers,
 98                    "auto.offset.reset": "earliest",
 99                    # "enable.auto.commit": "true",
100                    "group.id": self.group_id,
101                },
102                topics=self.topics,
103                timestamp_policy=kafka.ReadFromKafka.create_time_policy,
104                expansion_service=self.expansion_service,
105            )
106            | "DecodeMessage" >> beam.Map(decode_message)
107            | "PreProcessInput" >> PreProcessInput()
108        )
109
110
111@beam.typehints.with_input_types(typing.Tuple[str, float])
112class WriteMetricsToKafka(beam.PTransform):
113    def __init__(
114        self,
115        bootstrap_servers: str,
116        topic: str,
117        deprecated_read: bool, # TO DO: remove as it applies only to ReadFromKafka
118        label: str | None = None,
119    ):
120        super().__init__(label)
121        self.boostrap_servers = bootstrap_servers
122        self.topic = topic
123        # TO DO: remove as it applies only to ReadFromKafka
124        self.expansion_service = None        
125        if deprecated_read:
126            self.expansion_service = kafka.default_io_expansion_service(
127                ["--experiments=use_deprecated_read"]
128            )
129
130    def expand(self, pcoll: pvalue.PCollection):
131        def create_message(element: typing.Tuple[str, float]):
132            msg = json.dumps(dict(zip(["user", "speed"], element)))
133            print(msg)
134            return "".encode("utf-8"), msg.encode("utf-8")
135
136        return (
137            pcoll
138            | "CreateMessage"
139            >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
140            | "WriteToKafka"
141            >> kafka.WriteToKafka(
142                producer_config={"bootstrap.servers": self.boostrap_servers},
143                topic=self.topic,
144                expansion_service=self.expansion_service,
145            )
146        )

Sport Tracker

The main transforms of this pipeline perform

  1. Windowing: assigns input elements into a global window with the following configuration.
    • Emits (or triggers) a window after a certain amount of processing time has passed since data was received with a delay of 3 seconds. (trigger=AfterWatermark(early=AfterProcessingTime(3)))
    • Disallows Late data (allowed_lateness=0)
    • Assigns the output timestamp from the latest input timestamp (timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
      • By default, the end of window timestamp is taken, but it is so distance future for the global window. Therefore, we take the latest timestamp of the input elements instead.
    • Configures ACCUMULATING as the window’s accumulation mode (accumulation_mode=AccumulationMode.ACCUMULATING).
      • The trigger emits the results early but not multiple times. Therefore, which accumulation mode to choose doesn’t give a different result in this transform. We can ignore this configuration.
  2. ComputeMetrics: (1) groups by the input elements by key (user ID), (2) obtains the total distance and duration by looping through individual elements, (3) calculates the speed of a user, which is the distance divided by the duration, and, finally, (4) returns a metrics record, which is a tuple of user ID and speed.
  1# chapter2/sport_tracker.py
  2import os
  3import argparse
  4import re
  5import logging
  6import typing
  7
  8import apache_beam as beam
  9from apache_beam import pvalue
 10from apache_beam.transforms.window import GlobalWindows, TimestampCombiner
 11from apache_beam.transforms.trigger import (
 12    AfterWatermark,
 13    AccumulationMode,
 14    AfterProcessingTime,
 15)
 16from apache_beam.options.pipeline_options import PipelineOptions
 17from apache_beam.options.pipeline_options import SetupOptions
 18
 19from sport_tracker_utils import (
 20    Position,
 21    ReadPositionsFromKafka,
 22    WriteMetricsToKafka,
 23)
 24
 25
 26def compute(element: typing.Tuple[str, typing.Iterable[Position]]):
 27    last: Position = None
 28    distance = 0
 29    duration = 0
 30    for p in sorted(element[1], key=lambda p: p.timestamp):
 31        if last is not None:
 32            distance += abs(p.spot - last.spot)
 33            duration += p.timestamp - last.timestamp
 34        last = p
 35    return element[0], distance / duration if duration > 0 else 0
 36
 37
 38class ComputeMetrics(beam.PTransform):
 39    def expand(self, pcoll: pvalue.PCollection):
 40        return (
 41            pcoll | "GroupByKey" >> beam.GroupByKey() | "Compute" >> beam.Map(compute)
 42        )
 43
 44
 45def run(argv=None, save_main_session=True):
 46    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 47    parser.add_argument(
 48        "--bootstrap_servers",
 49        default="host.docker.internal:29092",
 50        help="Kafka bootstrap server addresses",
 51    )
 52    parser.add_argument("--input_topic", default="input-topic", help="Input topic")
 53    parser.add_argument(
 54        "--output_topic",
 55        default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
 56        help="Output topic",
 57    )
 58    parser.add_argument(
 59        "--deprecated_read",
 60        action="store_true",
 61        default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
 62    )
 63    parser.set_defaults(deprecated_read=False)
 64
 65    known_args, pipeline_args = parser.parse_known_args(argv)
 66
 67    # # We use the save_main_session option because one or more DoFn's in this
 68    # # workflow rely on global context (e.g., a module imported at module level).
 69    pipeline_options = PipelineOptions(pipeline_args)
 70    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
 71    print(f"known args - {known_args}")
 72    print(f"pipeline options - {pipeline_options.display_data()}")
 73
 74    with beam.Pipeline(options=pipeline_options) as p:
 75        (
 76            p
 77            | "ReadPositions"
 78            >> ReadPositionsFromKafka(
 79                bootstrap_servers=known_args.bootstrap_servers,
 80                topics=[known_args.input_topic],
 81                group_id=f"{known_args.output_topic}-group",
 82                deprecated_read=known_args.deprecated_read,
 83            )
 84            | "Windowing"
 85            >> beam.WindowInto(
 86                GlobalWindows(),
 87                trigger=AfterWatermark(early=AfterProcessingTime(3)),
 88                allowed_lateness=0,
 89                timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
 90                accumulation_mode=AccumulationMode.ACCUMULATING,
 91            )
 92            | "ComputeMetrics" >> ComputeMetrics()
 93            | "WriteNotifications"
 94            >> WriteMetricsToKafka(
 95                bootstrap_servers=known_args.bootstrap_servers,
 96                topic=known_args.output_topic,
 97                deprecated_read=known_args.deprecated_read,
 98            )
 99        )
100
101        logging.getLogger().setLevel(logging.WARN)
102        logging.info("Building pipeline ...")
103
104
105if __name__ == "__main__":
106    run()

Pipeline Test

As described in this documentation, we can test a Beam pipeline as following.

  1. Create a TestPipeline.
  2. Create some static, known test input data.
  3. Create a PCollection of input data using the Create transform (if bounded source) or a TestStream (if unbounded source)
  4. Apply the transform to the input PCollection and save the resulting output PCollection.
  5. Use PAssert and its subclasses (or testing utils in Python) to verify that the output PCollection contains the elements that you expect.

We use test sport activities of two users that are stored in a file (input/test-tracker-data.txt). Then, we add them into a test stream and apply the same preprocessing and windowing transforms. Finally, we apply the metric computation transform and compare the outputs with the expected outputs where the expected speed values are calculated using the same logic.

1# input/test-tracker-data.txt
2user0	109	1713939465.7636628
3user1	40	1713939465.7636628
4user0	108	1713939465.8801599
5user1	50	1713939465.8801658
6user0	115	1713939467.8904564
7user1	58	1713939467.8904696
8...
 1# chapter2/sport_tracker_test.py
 2import os
 3import re
 4import typing
 5import unittest
 6
 7import apache_beam as beam
 8from apache_beam.coders import coders
 9
10from apache_beam.testing.test_pipeline import TestPipeline
11from apache_beam.testing.util import assert_that, equal_to
12from apache_beam.testing.test_stream import TestStream
13from apache_beam.transforms.trigger import (
14    AfterWatermark,
15    AccumulationMode,
16    AfterProcessingTime,
17)
18from apache_beam.transforms.window import GlobalWindows, TimestampCombiner
19from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
20
21from sport_tracker_utils import Position, PreProcessInput
22from sport_tracker import ComputeMetrics
23
24
25def read_file(filename: str, inputpath: str):
26    with open(os.path.join(inputpath, filename), "r") as f:
27        return f.readlines()
28
29
30def compute_matrics(key: str, positions: typing.List[Position]):
31    last: Position = None
32    distance = 0
33    duration = 0
34    for p in sorted(positions, key=lambda p: p.timestamp):
35        if last is not None:
36            distance += abs(int(p.spot) - int(last.spot))
37            duration += float(p.timestamp) - float(last.timestamp)
38        last = p
39    return key, distance / duration if duration > 0 else 0
40
41
42def compute_expected_metrics(lines: list):
43    ones, twos = [], []
44    for line in lines:
45        workout, spot, timestamp = tuple(re.sub("\n", "", line).split("\t"))
46        position = Position(spot, timestamp)
47        if workout == "user0":
48            ones.append(position)
49        else:
50            twos.append(position)
51    return [
52        compute_matrics(key="user0", positions=ones),
53        compute_matrics(key="user1", positions=twos),
54    ]
55
56
57class SportTrackerTest(unittest.TestCase):
58    def test_windowing_behaviour(self):
59        options = PipelineOptions()
60        options.view_as(StandardOptions).streaming = True
61        with TestPipeline(options=options) as p:
62            PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
63            lines = read_file(
64                "test-tracker-data.txt", os.path.join(PARENT_DIR, "inputs")
65            )
66            test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
67            for line in lines:
68                test_stream.add_elements([line])
69            test_stream.advance_watermark_to_infinity()
70
71            output = (
72                p
73                | test_stream
74                | "PreProcessInput" >> PreProcessInput()
75                | "Windowing"
76                >> beam.WindowInto(
77                    GlobalWindows(),
78                    trigger=AfterWatermark(early=AfterProcessingTime(3)),
79                    allowed_lateness=0,
80                    timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
81                    accumulation_mode=AccumulationMode.ACCUMULATING,
82                )
83                | "ComputeMetrics" >> ComputeMetrics()
84            )
85
86            EXPECTED_OUTPUT = compute_expected_metrics(lines)
87
88            assert_that(output, equal_to(EXPECTED_OUTPUT))
89
90
91if __name__ == "__main__":
92    unittest.main()

We can execute the pipeline test as shown below.

1python chapter2/sport_tracker_test.py 
2.
3----------------------------------------------------------------------
4Ran 1 test in 1.492s
5
6OK

Pipeline Execution

Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by the Kafka producer app and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.

1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1       host.docker.internal

We specify only a single known argument that enables to use the legacy read (--deprecated_read) while accepting default values of the other known arguments (bootstrap_servers, input_topic …). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081). Alternatively, we can use an embedded Flink cluster if we exclude that argument.

1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter2/sport_tracker.py --deprecated_read \
4    --job_name=sport_tracker --runner FlinkRunner --flink_master=localhost:8081 \
5	--streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000

On Flink UI, we see the pipeline has two tasks. The first task is performed until windowing the input elements while the second task performs up to sending the metric records into the output topic.

On Kafka UI, we can check the output message is a dictionary of user ID and speed.

Sport Tracker SQL

The main transforms of this pipeline perform

  1. Windowing: assigns input elements into a fixed time window of 5 seconds with the following configuration.
    • Disallows Late data (allowed_lateness=0)
  2. ComputeMetrics: (1) converts the input elements into a new custom type (Track) so that the key (user ID) becomes present for grouping, (2) calculates the speed of a user where the distance and duration are obtained based on their max/min values only (!), and, finally, (3) returns a metrics record, which is a tuple of user ID and speed.
  1# chapter2/sport_tracker_sql.py
  2import os
  3import argparse
  4import re
  5import logging
  6import typing
  7
  8import apache_beam as beam
  9from apache_beam import pvalue, coders
 10from apache_beam.transforms.sql import SqlTransform
 11from apache_beam.transforms.window import FixedWindows
 12from apache_beam.options.pipeline_options import PipelineOptions
 13from apache_beam.options.pipeline_options import SetupOptions
 14
 15from sport_tracker_utils import (
 16    Position,
 17    ReadPositionsFromKafka,
 18    WriteMetricsToKafka,
 19)
 20
 21
 22class Track(typing.NamedTuple):
 23    user: str
 24    spot: int
 25    timestamp: float
 26
 27
 28coders.registry.register_coder(Track, coders.RowCoder)
 29
 30
 31def to_track(element: typing.Tuple[str, Position]):
 32    return Track(element[0], element[1].spot, element[1].timestamp)
 33
 34
 35class ComputeMetrics(beam.PTransform):
 36    def expand(self, pcoll: pvalue.PCollection):
 37        return (
 38            pcoll
 39            | "ToTrack" >> beam.Map(to_track).with_output_types(Track)
 40            | "Compute"
 41            >> SqlTransform(
 42                """
 43                WITH cte AS (
 44                    SELECT
 45                        `user`,
 46                        MIN(`spot`) - MAX(`spot`) AS distance,
 47                        MIN(`timestamp`) - MAX(`timestamp`) AS duration
 48                    FROM PCOLLECTION
 49                    GROUP BY `user`
 50                )
 51                SELECT 
 52                    `user`,
 53                    CASE WHEN duration = 0 THEN 0 ELSE distance / duration END AS speed
 54                FROM cte
 55                """
 56            )
 57            | "ToTuple"
 58            >> beam.Map(lambda e: tuple(e)).with_output_types(typing.Tuple[str, float])
 59        )
 60
 61
 62def run(argv=None, save_main_session=True):
 63    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 64    parser.add_argument(
 65        "--bootstrap_servers",
 66        default="host.docker.internal:29092",
 67        help="Kafka bootstrap server addresses",
 68    )
 69    parser.add_argument("--input_topic", default="input-topic", help="Input topic")
 70    parser.add_argument(
 71        "--output_topic",
 72        default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
 73        help="Output topic",
 74    )
 75    parser.add_argument(
 76        "--deprecated_read",
 77        action="store_true",
 78        default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
 79    )
 80    parser.set_defaults(deprecated_read=False)
 81
 82    known_args, pipeline_args = parser.parse_known_args(argv)
 83
 84    # # We use the save_main_session option because one or more DoFn's in this
 85    # # workflow rely on global context (e.g., a module imported at module level).
 86    pipeline_options = PipelineOptions(pipeline_args)
 87    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
 88    print(f"known args - {known_args}")
 89    print(f"pipeline options - {pipeline_options.display_data()}")
 90
 91    with beam.Pipeline(options=pipeline_options) as p:
 92        (
 93            p
 94            | "ReadPositions"
 95            >> ReadPositionsFromKafka(
 96                bootstrap_servers=known_args.bootstrap_servers,
 97                topics=[known_args.input_topic],
 98                group_id=f"{known_args.output_topic}-group",
 99                deprecated_read=known_args.deprecated_read,
100            )
101            | "Windowing" >> beam.WindowInto(FixedWindows(5), allowed_lateness=0)
102            | "ComputeMetrics" >> ComputeMetrics()
103            | "WriteNotifications"
104            >> WriteMetricsToKafka(
105                bootstrap_servers=known_args.bootstrap_servers,
106                topic=known_args.output_topic,
107                deprecated_read=known_args.deprecated_read,
108            )
109        )
110
111        logging.getLogger().setLevel(logging.WARN)
112        logging.info("Building pipeline ...")
113
114
115if __name__ == "__main__":
116    run()

Pipeline Test

We have 8 test activity records of two users. Below shows those records after sorting by user ID and timestamp. Using the sorted records, we can easily obtain the expected outputs, which can be found in the last column.

Note that the transform by Beam SQL cannot be tested by the streaming Python direct runner because it doesn’t support cross-language pipelines. Therefore, we use the Flink runner for testing.

 1# chapter2/sport_tracker_sql_test.py
 2import sys
 3import unittest
 4
 5import apache_beam as beam
 6from apache_beam.coders import coders
 7from apache_beam.testing.test_pipeline import TestPipeline
 8from apache_beam.testing.util import assert_that, equal_to
 9from apache_beam.testing.test_stream import TestStream
10from apache_beam.transforms.window import FixedWindows
11from apache_beam.options.pipeline_options import PipelineOptions
12
13from sport_tracker_utils import PreProcessInput
14from sport_tracker_sql import ComputeMetrics
15
16
17def main(out=sys.stderr, verbosity=2):
18    loader = unittest.TestLoader()
19
20    suite = loader.loadTestsFromModule(sys.modules[__name__])
21    unittest.TextTestRunner(out, verbosity=verbosity).run(suite)
22
23
24class SportTrackerTest(unittest.TestCase):
25    def test_windowing_behaviour(self):
26        pipeline_opts = {"runner": "FlinkRunner", "parallelism": 1, "streaming": True}
27        options = PipelineOptions([], **pipeline_opts)
28        with TestPipeline(options=options) as p:
29            lines = [
30                "user0\t0\t0",
31                "user1\t10\t2",
32                "user0\t5\t4",
33                "user1\t3\t3",
34                "user0\t10\t6",
35                "user1\t2\t7",
36                "user0\t4\t9",
37                "user1\t10\t9",
38            ]
39            test_stream = TestStream(coder=coders.StrUtf8Coder()).with_output_types(str)
40            for line in lines:
41                test_stream.add_elements([line])
42            test_stream.advance_watermark_to_infinity()
43
44            output = (
45                p
46                | test_stream
47                | "PreProcessInput" >> PreProcessInput()
48                | "Windowing" >> beam.WindowInto(FixedWindows(5), allowed_lateness=0)
49                | "ComputeMetrics" >> ComputeMetrics()
50            )
51
52            assert_that(
53                output,
54                equal_to(
55                    [("user0", 1.25), ("user1", 7.0), ("user0", 2.0), ("user1", 4.0)]
56                ),
57            )
58
59
60if __name__ == "__main__":
61    main(out=None)

The pipeline can be tested as shown below.

1python chapter2/sport_tracker_sql_test.py 
2ok
3
4----------------------------------------------------------------------
5Ran 1 test in 34.487s
6
7OK

Pipeline Execution

Similar to the previous example, we use the legacy read (--deprecated_read) while accepting default values of the other known arguments. Note that the transform by Beam SQL fails to run on a local Flink cluster. Therefore, an embedded Flink cluster is used without specifying the flink master argument.

1## start the beam pipeline with an embedded flink cluster
2python chapter2/sport_tracker_sql.py --deprecated_read \
3    --job_name=sport_tracker_sql --runner FlinkRunner \
4	--streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000

On Kafka UI, we can check the output message is a dictionary of user ID and speed.

Potential Improvements of Beam SQL for Python SDK

As discussed earlier, the first pipeline uses native transforms, and it takes individual elements to calculate the distance and duration. On the other hand, the second pipeline approximates those values by their max/min values only. This kind of approximation is misleading and there are two options that we can overcome such a limitation.

  • User defined function: The Java SDK supports user defined functions that accept a custom Java scalar or aggregation function. We can use a user defined aggregation function to mimics the first pipeline using Beam SQL, but it is not supported in the Python SDK at the moment.
  • Beam SQL aggregation analytics functionality (BEAM-9198): This ticket aims to implement SQL window analytics functions, and we would be able to take individual elements by using the lead (or lag) function if supported.

I consider the usage of Beam SQL would be limited unless one or all of those features are supported in the Python SDK, although it supports interesting features such as External Table and JOIN.