In this post, we develop two Apache Beam pipelines that calculate average word lengths from input texts that are ingested by a Kafka topic. They obtain the statistics in different angles. The first pipeline emits the global average lengths whenever a new input text arrives while the latter triggers those values in a sliding time window.

Development Environment

The development environment has an Apache Flink cluster and Apache Kafka cluster and gRPC server. The gRPC server will be used in Part 4 to 6. For Flink, we can use either an embedded cluster or a local cluster while Docker Compose is used for the rest. See Part 1 for details about how to set up the development environment. The source of this post can be found in this GitHub repository.

Manage Environment

The Flink and Kafka clusters and gRPC server are managed by the following bash scripts.

  • ./setup/start-flink-env.sh
  • ./setup/stop-flink-env.sh

Those scripts accept four flags: -f, -k and -g to start/stop individual resources or -a to manage all of them. We can add multiple flags to start/stop relevant resources. Note that the scripts assume Flink 1.18.1 by default, and we can specify a specific Flink version if it is different from it e.g. FLINK_VERSION=1.17.2 ./setup/start-flink-env.sh.

Below shows how to start resources using the start-up script. We need to launch both the Flink and Kafka clusters if we deploy a Beam pipeline on a local Flink cluster. Otherwise, we can start the Kafka cluster only.

 1## start a local flink can kafka cluster
 2./setup/start-flink-env.sh -f -k
 3# start kafka...
 4# [+] Running 6/6
 5#  ⠿ Network app-network      Created                                     0.0s
 6#  ⠿ Volume "zookeeper_data"  Created                                     0.0s
 7#  ⠿ Volume "kafka_0_data"    Created                                     0.0s
 8#  ⠿ Container zookeeper      Started                                     0.3s
 9#  ⠿ Container kafka-0        Started                                     0.5s
10#  ⠿ Container kafka-ui       Started                                     0.8s
11# start flink 1.18.1...
12# Starting cluster.
13# Starting standalonesession daemon on host <hostname>.
14# Starting taskexecutor daemon on host <hostname>.
15
16## start a local kafka cluster only
17./setup/start-flink-env.sh -k
18# start kafka...
19# [+] Running 6/6
20#  ⠿ Network app-network      Created                                     0.0s
21#  ⠿ Volume "zookeeper_data"  Created                                     0.0s
22#  ⠿ Volume "kafka_0_data"    Created                                     0.0s
23#  ⠿ Container zookeeper      Started                                     0.3s
24#  ⠿ Container kafka-0        Started                                     0.5s
25#  ⠿ Container kafka-ui       Started                                     0.8s

Kafka Text Producer

We create a Kafka producer using the kafka-python package. It generates text messages with the Faker package and sends them to an input topic. We can run the producer simply by executing the producer script. See Part 1 for details about the Kafka producer.

Beam Pipelines

We develop two Apache Beam pipelines that calculate average word lengths from input texts that are ingested by a Kafka topic. The first pipeline emits the global average lengths whenever a new input text arrives while the latter triggers those values in a sliding time window.

Shared Transforms

Both the pipelines read text messages from an input Kafka topic and write outputs to an output topic. Therefore, the data source and sink transforms are refactored into a utility module as shown below. Note that, the Kafka read and write transforms have an argument called deprecated_read, which forces to use the legacy read when it is set to True. We will use the legacy read in this post to prevent a problem that is described in this GitHub issue.

 1# chapter2/word_process_utils.py
 2import re
 3import typing
 4
 5import apache_beam as beam
 6from apache_beam import pvalue
 7from apache_beam.io import kafka
 8
 9
10def decode_message(kafka_kv: tuple):
11    print(kafka_kv)
12    return kafka_kv[1].decode("utf-8")
13
14
15def tokenize(element: str):
16    return re.findall(r"[A-Za-z\']+", element)
17
18
19class ReadWordsFromKafka(beam.PTransform):
20    def __init__(
21        self,
22        bootstrap_servers: str,
23        topics: typing.List[str],
24        group_id: str,
25        deprecated_read: bool,
26        verbose: bool = False,
27        label: str | None = None,
28    ) -> None:
29        super().__init__(label)
30        self.boostrap_servers = bootstrap_servers
31        self.topics = topics
32        self.group_id = group_id
33        self.verbose = verbose
34        self.expansion_service = None
35        if deprecated_read:
36            self.expansion_service = kafka.default_io_expansion_service(
37                ["--experiments=use_deprecated_read"]
38            )
39
40    def expand(self, input: pvalue.PBegin):
41        return (
42            input
43            | "ReadFromKafka"
44            >> kafka.ReadFromKafka(
45                consumer_config={
46                    "bootstrap.servers": self.boostrap_servers,
47                    "auto.offset.reset": "latest",
48                    # "enable.auto.commit": "true",
49                    "group.id": self.group_id,
50                },
51                topics=self.topics,
52                timestamp_policy=kafka.ReadFromKafka.create_time_policy,
53                commit_offset_in_finalize=True,
54                expansion_service=self.expansion_service,
55            )
56            | "DecodeMessage" >> beam.Map(decode_message)
57        )
58
59
60class WriteProcessOutputsToKafka(beam.PTransform):
61    def __init__(
62        self,
63        bootstrap_servers: str,
64        topic: str,
65        deprecated_read: bool, # TO DO: remove as it applies only to ReadFromKafka
66        label: str | None = None,
67    ) -> None:
68        super().__init__(label)
69        self.boostrap_servers = bootstrap_servers
70        self.topic = topic
71        # TO DO: remove as it applies only to ReadFromKafka
72        self.expansion_service = None
73        if deprecated_read:
74            self.expansion_service = kafka.default_io_expansion_service(
75                ["--experiments=use_deprecated_read"]
76            )
77
78    def expand(self, pcoll: pvalue.PCollection):
79        return pcoll | "WriteToKafka" >> kafka.WriteToKafka(
80            producer_config={"bootstrap.servers": self.boostrap_servers},
81            topic=self.topic,
82            expansion_service=self.expansion_service,
83        )

Calculate Average Word Length

The main transform of this pipeline (CalculateAverageWordLength) performs

  1. Windowing: assigns input text messages into a global window with the following configuration.
    • Emits (or triggers) the result for every new input text (trigger=Repeatedly(AfterCount(1)))
    • Disallows Late data (allowed_lateness=0)
    • Assigns the output timestamp from the latest input timestamp (timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)
      • By default, the end of window timestamp is taken, but it is so distance future for the global window. Therefore, we take the latest timestamp of the input elements instead.
    • Configures ACCUMULATING for calculating global average (accumulation_mode=AccumulationMode.ACCUMULATING)
      • Either DISCARDING (reset the result when fired) or ACCUMULATING (keep the previous result).
  2. Tokenize: tokenizes (or converts) text into words.
  3. GetAvgWordLength: calculates the average word lengths by combining all words using a custom combine function (AverageFn).
    • Using an accumulator, AverageFn obtains the average word length by dividing the sum of lengths with the number of words.

Also, it adds the window start and end timestamps when creating output messages (CreateMessags).

  1# chapter2/average_word_length.py
  2import os
  3import json
  4import argparse
  5import re
  6import logging
  7import typing
  8
  9import apache_beam as beam
 10from apache_beam import pvalue
 11from apache_beam.transforms.window import GlobalWindows, TimestampCombiner
 12from apache_beam.transforms.trigger import AfterCount, AccumulationMode, Repeatedly
 13from apache_beam.transforms.util import Reify
 14from apache_beam.options.pipeline_options import PipelineOptions
 15from apache_beam.options.pipeline_options import SetupOptions
 16
 17from word_process_utils import tokenize, ReadWordsFromKafka, WriteProcessOutputsToKafka
 18
 19
 20class AvgAccum(typing.NamedTuple):
 21    length: int
 22    count: int
 23
 24
 25beam.coders.registry.register_coder(AvgAccum, beam.coders.RowCoder)
 26
 27
 28class AverageFn(beam.CombineFn):
 29    def create_accumulator(self):
 30        return AvgAccum(length=0, count=0)
 31
 32    def add_input(self, mutable_accumulator: AvgAccum, element: str):
 33        length, count = tuple(mutable_accumulator)
 34        return AvgAccum(length=length + len(element), count=count + 1)
 35
 36    def merge_accumulators(self, accumulators: typing.List[AvgAccum]):
 37        lengths, counts = zip(*accumulators)
 38        return AvgAccum(length=sum(lengths), count=sum(counts))
 39
 40    def extract_output(self, accumulator: AvgAccum):
 41        length, count = tuple(accumulator)
 42        return length / count if count else float("NaN")
 43
 44    def get_accumulator_coder(self):
 45        return beam.coders.registry.get_coder(AvgAccum)
 46
 47
 48class CalculateAverageWordLength(beam.PTransform):
 49    def expand(self, pcoll: pvalue.PCollection):
 50        return (
 51            pcoll
 52            | "Windowing"
 53            >> beam.WindowInto(
 54                GlobalWindows(),
 55                trigger=Repeatedly(AfterCount(1)),
 56                allowed_lateness=0,
 57                timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST,
 58                accumulation_mode=AccumulationMode.ACCUMULATING,
 59                # closing behaviour - EMIT_ALWAYS, on_time_behavior - FIRE_ALWAYS
 60            )
 61            | "Tokenize" >> beam.FlatMap(tokenize)
 62            | "GetAvgWordLength"
 63            >> beam.CombineGlobally(
 64                AverageFn()
 65            ).without_defaults()  # DAG gets complicated if with_default()
 66        )
 67
 68
 69def create_message(element: typing.Tuple[str, float]):
 70    msg = json.dumps(dict(zip(["created_at", "avg_len"], element)))
 71    print(msg)
 72    return "".encode("utf-8"), msg.encode("utf-8")
 73
 74
 75class AddTS(beam.DoFn):
 76    def process(self, avg_len: float, ts_param=beam.DoFn.TimestampParam):
 77        yield ts_param.to_rfc3339(), avg_len
 78
 79
 80class CreateMessags(beam.PTransform):
 81    def expand(self, pcoll: pvalue.PCollection):
 82        return (
 83            pcoll
 84            | "ReifyTimestamp" >> Reify.Timestamp()
 85            | "AddTimestamp" >> beam.ParDo(AddTS())
 86            | "CreateMessages"
 87            >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
 88        )
 89
 90
 91def run(argv=None, save_main_session=True):
 92    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 93    parser.add_argument(
 94        "--bootstrap_servers",
 95        default="host.docker.internal:29092",
 96        help="Kafka bootstrap server addresses",
 97    )
 98    parser.add_argument("--input_topic", default="input-topic", help="Input topic")
 99    parser.add_argument(
100        "--output_topic",
101        default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
102        help="Output topic",
103    )
104    parser.add_argument(
105        "--deprecated_read",
106        action="store_true",
107        default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
108    )
109    parser.set_defaults(deprecated_read=False)
110
111    known_args, pipeline_args = parser.parse_known_args(argv)
112
113    # # We use the save_main_session option because one or more DoFn's in this
114    # # workflow rely on global context (e.g., a module imported at module level).
115    pipeline_options = PipelineOptions(pipeline_args)
116    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
117    print(f"known args - {known_args}")
118    print(f"pipeline options - {pipeline_options.display_data()}")
119
120    with beam.Pipeline(options=pipeline_options) as p:
121        (
122            p
123            | "ReadInputsFromKafka"
124            >> ReadWordsFromKafka(
125                bootstrap_servers=known_args.bootstrap_servers,
126                topics=[known_args.input_topic],
127                group_id=f"{known_args.output_topic}-group",
128                deprecated_read=known_args.deprecated_read,
129            )
130            | "CalculateAverageWordLength" >> CalculateAverageWordLength()
131            | "CreateMessags" >> CreateMessags()
132            | "WriteOutputsToKafka"
133            >> WriteProcessOutputsToKafka(
134                bootstrap_servers=known_args.bootstrap_servers,
135                topic=known_args.output_topic,
136                deprecated_read=known_args.deprecated_read,
137            )
138        )
139
140        logging.getLogger().setLevel(logging.WARN)
141        logging.info("Building pipeline ...")
142
143
144if __name__ == "__main__":
145    run()

Pipeline Test

As described in this documentation, we can test a Beam pipeline as following.

  1. Create a TestPipeline.
  2. Create some static, known test input data.
  3. Create a PCollection of input data using the Create transform (if bounded source) or a TestStream (if unbounded source)
  4. Apply the transform to the input PCollection and save the resulting output PCollection.
  5. Use PAssert and its subclasses (or testing utils in Python) to verify that the output PCollection contains the elements that you expect.

We have three elements and the pipeline should emit the average word length whenever a new element is delivered. Therefore, the expected average word length values are 1 (1/1), 1.5 (3/2) and 2 (6/3).

 1# chapter2/average_word_length_test.py
 2import unittest
 3
 4from apache_beam.coders import coders
 5from apache_beam.testing.test_pipeline import TestPipeline
 6from apache_beam.testing.util import assert_that, equal_to
 7from apache_beam.testing.test_stream import TestStream
 8from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
 9
10from average_word_length import CalculateAverageWordLength
11
12
13class MaxWordLengthTest(unittest.TestCase):
14    def test_windowing_behaviour(self):
15        options = PipelineOptions()
16        options.view_as(StandardOptions).streaming = True
17        with TestPipeline(options=options) as p:
18            """
19               We should put each element separately. The reason we do this is to ensure 
20               that our trigger will be invoked in between each element. Because the trigger 
21               invocation is optional, a runner might skip a particular firing. Putting each element 
22               separately into addElements makes DirectRunner (our testing runner) invokes a trigger 
23               for each input element.
24            """
25            test_stream = (
26                TestStream(coder=coders.StrUtf8Coder())
27                .with_output_types(str)
28                .add_elements(["a"])
29                .add_elements(["bb"])
30                .add_elements(["ccc"])
31                .advance_watermark_to_infinity()
32            )
33
34            output = (
35                p
36                | test_stream
37                | "CalculateMaxWordLength" >> CalculateAverageWordLength()
38            )
39
40            EXPECTED_OUTPUT = [1.0, 1.5, 2.0]
41
42            assert_that(output, equal_to(EXPECTED_OUTPUT))
43
44
45if __name__ == "__main__":
46    unittest.main()

We can execute the pipeline test as shown below.

1python chapter2/average_word_length_test.py -v
2test_windowing_behaviour (__main__.MaxWordLengthTest) ... ok
3
4----------------------------------------------------------------------
5Ran 1 test in 0.343s
6
7OK

Pipeline Execution

Note that the Kafka bootstrap server is accessible on port 29092 outside the Docker network, and it can be accessed on localhost:29092 from the Docker host machine and on host.docker.internal:29092 from a Docker container that is launched with the host network. We use both types of the bootstrap server address - the former is used by the Kafka producer app and the latter by a Java IO expansion service, which is launched in a Docker container. Note further that, for the latter to work, we have to update the /etc/hosts file by adding an entry for host.docker.internal as shown below.

1cat /etc/hosts | grep host.docker.internal
2# 127.0.0.1       host.docker.internal

We specify only a single known argument that enables to use the legacy read (--deprecated_read) while accepting default values of the other known arguments (bootstrap_servers, input_topic …). The remaining arguments are all pipeline arguments. Note that we deploy the pipeline on a local Flink cluster by specifying the flink master argument (--flink_master=localhost:8081). Alternatively, we can use an embedded Flink cluster if we exclude that argument.

1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter2/average_word_length.py --deprecated_read \
4    --job_name=avg-word-length --runner FlinkRunner --flink_master=localhost:8081 \
5	--streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000

On Flink UI, we see the pipeline has two tasks. The first task is performed until tokenizing text into words while the second task performs up to sending the average word length records into the output topic.

On Kafka UI, we can check the output message includes an average word length and record creation timestamp.

Calculate Average Word Length with Fixed Lookback

The main transform of this pipeline (CalculateAverageWordLength) performs

  1. Windowing: assigns input text messages into a sliding time window where the default size and period are set to 10 and 2 seconds respectively by default. It means it calculates rolling average word lengths in windows of 10 seconds where a new window starts every 2 seconds.
  2. Tokenize: tokenizes (or converts) text into words
  3. GetAvgWordLength: calculates the average word lengths by combining all words using a custom combine function (AverageFn)
    • Using an accumulator, AverageFn obtains the average word length by dividing the sum of lengths with the number of words.

Also, the output window timestamps are added when creating output messages (CreateMessags).

  1# chapter2/sliding_window_word_length.py
  2import os
  3import json
  4import argparse
  5import re
  6import logging
  7import typing
  8
  9import apache_beam as beam
 10from apache_beam import pvalue
 11from apache_beam.transforms.window import SlidingWindows
 12from apache_beam.options.pipeline_options import PipelineOptions
 13from apache_beam.options.pipeline_options import SetupOptions
 14
 15from word_process_utils import tokenize, ReadWordsFromKafka, WriteProcessOutputsToKafka
 16
 17
 18class AvgAccum(typing.NamedTuple):
 19    length: int
 20    count: int
 21
 22
 23beam.coders.registry.register_coder(AvgAccum, beam.coders.RowCoder)
 24
 25
 26class AverageFn(beam.CombineFn):
 27    def create_accumulator(self):
 28        return AvgAccum(length=0, count=0)
 29
 30    def add_input(self, mutable_accumulator: AvgAccum, element: str):
 31        length, count = tuple(mutable_accumulator)
 32        return AvgAccum(length=length + len(element), count=count + 1)
 33
 34    def merge_accumulators(self, accumulators: typing.List[AvgAccum]):
 35        lengths, counts = zip(*accumulators)
 36        return AvgAccum(length=sum(lengths), count=sum(counts))
 37
 38    def extract_output(self, accumulator: AvgAccum):
 39        length, count = tuple(accumulator)
 40        return length / count if count else float("NaN")
 41
 42    def get_accumulator_coder(self):
 43        return beam.coders.registry.get_coder(AvgAccum)
 44
 45
 46class CalculateAverageWordLength(beam.PTransform):
 47    def __init__(self, size: int, period: int, label: str | None = None) -> None:
 48        super().__init__(label)
 49        self.size = size
 50        self.period = period
 51
 52    def expand(self, pcoll: pvalue.PCollection):
 53        return (
 54            pcoll
 55            | "Windowing"
 56            >> beam.WindowInto(SlidingWindows(size=self.size, period=self.period))
 57            | "Tokenize" >> beam.FlatMap(tokenize)
 58            | "GetAvgWordLength" >> beam.CombineGlobally(AverageFn()).without_defaults()
 59        )
 60
 61
 62def create_message(element: typing.Tuple[str, str, float]):
 63    msg = json.dumps(dict(zip(["window_start", "window_end", "avg_len"], element)))
 64    print(msg)
 65    return "".encode("utf-8"), msg.encode("utf-8")
 66
 67
 68class AddWindowTS(beam.DoFn):
 69    def process(self, avg_len: float, win_param=beam.DoFn.WindowParam):
 70        yield (
 71            win_param.start.to_rfc3339(),
 72            win_param.end.to_rfc3339(),
 73            avg_len,
 74        )
 75
 76
 77class CreateMessags(beam.PTransform):
 78    def expand(self, pcoll: pvalue.PCollection):
 79        return (
 80            pcoll
 81            | "AddWindowTS" >> beam.ParDo(AddWindowTS())
 82            | "CreateMessages"
 83            >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
 84        )
 85
 86
 87def run(argv=None, save_main_session=True):
 88    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 89    parser.add_argument(
 90        "--bootstrap_servers",
 91        default="host.docker.internal:29092",
 92        help="Kafka bootstrap server addresses",
 93    )
 94    parser.add_argument("--input_topic", default="input-topic", help="Input topic")
 95    parser.add_argument(
 96        "--output_topic",
 97        default=re.sub("_", "-", re.sub(".py$", "", os.path.basename(__file__))),
 98        help="Output topic",
 99    )
100    parser.add_argument("--size", type=int, default=10, help="Window size")
101    parser.add_argument("--period", type=int, default=2, help="Window period")
102    parser.add_argument(
103        "--deprecated_read",
104        action="store_true",
105        default="Whether to use a deprecated read. See https://github.com/apache/beam/issues/20979",
106    )
107    parser.set_defaults(deprecated_read=False)
108
109    known_args, pipeline_args = parser.parse_known_args(argv)
110
111    # # We use the save_main_session option because one or more DoFn's in this
112    # # workflow rely on global context (e.g., a module imported at module level).
113    pipeline_options = PipelineOptions(pipeline_args)
114    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
115    print(f"known args - {known_args}")
116    print(f"pipeline options - {pipeline_options.display_data()}")
117
118    with beam.Pipeline(options=pipeline_options) as p:
119        (
120            p
121            | "ReadInputsFromKafka"
122            >> ReadWordsFromKafka(
123                bootstrap_servers=known_args.bootstrap_servers,
124                topics=[known_args.input_topic],
125                group_id=f"{known_args.output_topic}-group",
126                deprecated_read=known_args.deprecated_read,
127            )
128            | "CalculateAverageWordLength"
129            >> CalculateAverageWordLength(
130                size=known_args.size, period=known_args.period
131            )
132            | "CreateMessags" >> CreateMessags()
133            | "WriteOutputsToKafka"
134            >> WriteProcessOutputsToKafka(
135                bootstrap_servers=known_args.bootstrap_servers,
136                topic=known_args.output_topic,
137                deprecated_read=known_args.deprecated_read,
138            )
139        )
140
141        logging.getLogger().setLevel(logging.WARN)
142        logging.info("Building pipeline ...")
143
144
145if __name__ == "__main__":
146    run()

Pipeline Test

We add four elements as listed below.

  • a after 0 second
  • bb after 1.99 seconds
  • ccc after 5 seconds
  • dddd after 10 seconds

Therefore, we can expect which elements belong to which windows and calculate average word lengths accordingly.

 1# chapter2/sliding_window_word_length_test.py
 2import datetime
 3import unittest
 4
 5import apache_beam as beam
 6from apache_beam.coders import coders
 7from apache_beam.utils.timestamp import Timestamp
 8from apache_beam.testing.test_pipeline import TestPipeline
 9from apache_beam.testing.util import assert_that, equal_to
10from apache_beam.testing.test_stream import TestStream
11from apache_beam.transforms.window import TimestampedValue
12from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
13
14from sliding_window_word_length import CalculateAverageWordLength, AddWindowTS
15
16
17def create_dt(index: int):
18    return datetime.datetime.fromtimestamp(index - 8, tz=datetime.timezone.utc).replace(
19        tzinfo=None
20    )
21
22
23def to_rfc3339(dt: datetime.datetime):
24    return f'{dt.isoformat(sep="T", timespec="seconds")}Z'
25
26
27class SlidingWindowWordLengthTest(unittest.TestCase):
28    def test_windowing_behaviour(self):
29        options = PipelineOptions()
30        options.view_as(StandardOptions).streaming = True
31        with TestPipeline(options=options) as p:
32            now = 0
33            test_stream = (
34                TestStream(coder=coders.StrUtf8Coder())
35                .with_output_types(str)
36                .add_elements([TimestampedValue("a", Timestamp.of(now + 0))])
37                .add_elements([TimestampedValue("bb", Timestamp.of(now + 1.99))])
38                .add_elements([TimestampedValue("ccc", Timestamp.of(now + 5))])
39                .add_elements([TimestampedValue("dddd", Timestamp.of(now + 10))])
40                .advance_watermark_to_infinity()
41            )
42
43            output = (
44                p
45                | test_stream
46                | "CalculateAverageWordLength"
47                >> CalculateAverageWordLength(size=10, period=2)
48                | "AddWindowTS" >> beam.ParDo(AddWindowTS())
49            )
50
51            EXPECTED_VALUES = [1.5, 1.5, 2.0, 2.0, 2.0, 3.5, 3.5, 4.0, 4.0, 4.0]
52            EXPECTED_OUTPUT = [
53                (
54                    to_rfc3339(create_dt(i * 2)),
55                    to_rfc3339(create_dt(i * 2) + datetime.timedelta(seconds=10)),
56                    EXPECTED_VALUES[i],
57                )
58                for i in range(len(EXPECTED_VALUES))
59            ]
60
61            assert_that(output, equal_to(EXPECTED_OUTPUT))
62
63
64if __name__ == "__main__":
65    unittest.main()

The pipeline can be tested as shown below.

1python chapter2/sliding_window_word_length_test.py -v
2test_windowing_behaviour (__main__.SlidingWindowWordLengthTest) ... ok
3
4----------------------------------------------------------------------
5Ran 1 test in 0.358s
6
7OK

Pipeline Execution

Similar to the previous example, we use the legacy read (--deprecated_read) while accepting default values of the other known arguments. Also, the pipeline is deployed on a local Flink cluster (--flink_master=localhost:8081). Do not forget to update the /etc/hosts file by adding an entry for host.docker.internal as mentioned earlier.

1## start the beam pipeline
2## exclude --flink_master if using an embedded cluster
3python chapter2/sliding_window_word_length.py --deprecated_read\
4    --job_name=slinding-word-length --runner FlinkRunner --flink_master=localhost:8081 \
5	--streaming --environment_type=LOOPBACK --parallelism=3 --checkpointing_interval=10000

On Flink UI, we can see two tasks in the job graph as well.

On Kafka UI, we can check the output message includes an average word length as well as window start/end timestamps.