We developed batch and streaming pipelines in Part 2 and Part 4. Often it is faster and simpler to identify and fix bugs on the pipeline code by performing local unit testing. Moreover, especially when it comes to creating a streaming pipeline, unit testing cases can facilitate development further by using TestStream as it allows us to advance watermarks or processing time according to different scenarios. In this post, we discuss how to perform unit testing of the batch and streaming pipelines that we developed earlier.

Batch Pipeline Testing

Pipeline Code

The pipeline begins with reading data from a folder named inputs and parses the Json lines. Then it creates a key-value pair where the key is the user ID (id) and the value is the file size bytes (file_size_bytes). After that, the records are grouped by the key and aggregated to obtain website visit count and traffic consumption distribution using a ParDo transform. Finally, the output records are written to a folder named outputs after being converted into dictionary.

  1# section4/user_traffic.py
  2import os
  3import datetime
  4import argparse
  5import json
  6import logging
  7import typing
  9import apache_beam as beam
 10from apache_beam.options.pipeline_options import PipelineOptions
 11from apache_beam.options.pipeline_options import StandardOptions
 14class EventLog(typing.NamedTuple):
 15    ip: str
 16    id: str
 17    lat: float
 18    lng: float
 19    user_agent: str
 20    age_bracket: str
 21    opted_into_marketing: bool
 22    http_request: str
 23    http_response: int
 24    file_size_bytes: int
 25    event_datetime: str
 26    event_ts: int
 29class UserTraffic(typing.NamedTuple):
 30    id: str
 31    page_views: int
 32    total_bytes: int
 33    max_bytes: int
 34    min_bytes: int
 37def parse_json(element: str):
 38    row = json.loads(element)
 39    # lat/lng sometimes empty string
 40    if not row["lat"] or not row["lng"]:
 41        row = {**row, **{"lat": -1, "lng": -1}}
 42    return EventLog(**row)
 45class Aggregate(beam.DoFn):
 46    def process(self, element: typing.Tuple[str, typing.Iterable[int]]):
 47        key, values = element
 48        yield UserTraffic(
 49            id=key,
 50            page_views=len(values),
 51            total_bytes=sum(values),
 52            max_bytes=max(values),
 53            min_bytes=min(values),
 54        )
 57def run():
 58    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 59    parser.add_argument(
 60        "--inputs",
 61        default="inputs",
 62        help="Specify folder name that event records are saved",
 63    )
 64    parser.add_argument(
 65        "--runner", default="DirectRunner", help="Specify Apache Beam Runner"
 66    )
 67    opts = parser.parse_args()
 68    PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
 70    options = PipelineOptions()
 71    options.view_as(StandardOptions).runner = opts.runner
 73    p = beam.Pipeline(options=options)
 74    (
 75        p
 76        | "Read from files"
 77        >> beam.io.ReadFromText(
 78            file_pattern=os.path.join(PARENT_DIR, opts.inputs, "*.out")
 79        )
 80        | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
 81        | "Form key value pair" >> beam.Map(lambda e: (e.id, e.file_size_bytes))
 82        | "Group by key" >> beam.GroupByKey()
 83        | "Aggregate by id" >> beam.ParDo(Aggregate()).with_output_types(UserTraffic)
 84        | "To dict" >> beam.Map(lambda e: e._asdict())
 85        | "Write to file"
 86        >> beam.io.WriteToText(
 87            file_path_prefix=os.path.join(
 88                PARENT_DIR,
 89                "outputs",
 90                f"{int(datetime.datetime.now().timestamp() * 1000)}",
 91            ),
 92            file_name_suffix=".out",
 93        )
 94    )
 96    logging.getLogger().setLevel(logging.INFO)
 97    logging.info("Building pipeline ...")
 99    p.run().wait_until_finish()
102if __name__ == "__main__":
103    run()

Test Pipeline

As illustrated in the Beam documentation, we can use the following pattern to test a Beam pipeline.

  • Create a TestPipeline.
  • Create some static, known test input data.
  • Use the Create transform to create a PCollection of your input data.
  • Apply your transform to the input PCollection and save the resulting output PCollection.
  • Use PAssert and its subclasses (or testing functions in Python) to verify that the output PCollection contains the elements that you expect.

We have three unit testing cases for this batch pipeline. The first two cases checks whether the input elements are parsed into the custom EventLog type as expected while the last case is used to test if the pipeline aggregates the elements by user correctly. In all cases, pipeline outputs are compared to expected outputs for verification.

  1# section4/user_traffic_test.py
  2import sys
  3import unittest
  5import apache_beam as beam
  6from apache_beam.testing.test_pipeline import TestPipeline
  7from apache_beam.testing.util import assert_that, equal_to
  9from user_traffic import EventLog, UserTraffic, parse_json, Aggregate
 12def main(out=sys.stderr, verbosity=2):
 13    loader = unittest.TestLoader()
 15    suite = loader.loadTestsFromModule(sys.modules[__name__])
 16    unittest.TextTestRunner(out, verbosity=verbosity).run(suite)
 19class ParseJsonTest(unittest.TestCase):
 20    def test_parse_json(self):
 21        with TestPipeline() as p:
 22            LINES = [
 23                '{"ip": "", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
 24                '{"ip": "", "id": "5135574965990269004", "lat": 36.7323, "lng": 3.0875, "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_9 rv:2.0; wo-SN) AppleWebKit/531.18.2 (KHTML, like Gecko) Version/4.0.4 Safari/531.18.2", "age_bracket": "26-40", "opted_into_marketing": false, "http_request": "GET coniferophyta.html HTTP/1.0", "http_response": 200, "file_size_bytes": 427, "event_datetime": "2024-03-01T05:48:52.985", "event_ts": 1709232532985}',
 25            ]
 27            EXPECTED_OUTPUT = [
 28                EventLog(
 29                    ip="",
 30                    id="462520009613048791",
 31                    lat=50.4779,
 32                    lng=12.3713,
 33                    user_agent="Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7",
 34                    age_bracket="18-25",
 35                    opted_into_marketing=False,
 36                    http_request="GET eucharya.html HTTP/1.0",
 37                    http_response=200,
 38                    file_size_bytes=207,
 39                    event_datetime="2024-03-01T05:51:22.083",
 40                    event_ts=1709232682083,
 41                ),
 42                EventLog(
 43                    ip="",
 44                    id="5135574965990269004",
 45                    lat=36.7323,
 46                    lng=3.0875,
 47                    user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_9 rv:2.0; wo-SN) AppleWebKit/531.18.2 (KHTML, like Gecko) Version/4.0.4 Safari/531.18.2",
 48                    age_bracket="26-40",
 49                    opted_into_marketing=False,
 50                    http_request="GET coniferophyta.html HTTP/1.0",
 51                    http_response=200,
 52                    file_size_bytes=427,
 53                    event_datetime="2024-03-01T05:48:52.985",
 54                    event_ts=1709232532985,
 55                ),
 56            ]
 58            output = (
 59                p
 60                | beam.Create(LINES)
 61                | beam.Map(parse_json).with_output_types(EventLog)
 62            )
 64            assert_that(output, equal_to(EXPECTED_OUTPUT))
 66    def test_parse_null_lat_lng(self):
 67        with TestPipeline() as p:
 68            LINES = [
 69                '{"ip": "", "id": "462520009613048791", "lat": null, "lng": null, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
 70            ]
 72            EXPECTED_OUTPUT = [
 73                EventLog(
 74                    ip="",
 75                    id="462520009613048791",
 76                    lat=-1,
 77                    lng=-1,
 78                    user_agent="Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7",
 79                    age_bracket="18-25",
 80                    opted_into_marketing=False,
 81                    http_request="GET eucharya.html HTTP/1.0",
 82                    http_response=200,
 83                    file_size_bytes=207,
 84                    event_datetime="2024-03-01T05:51:22.083",
 85                    event_ts=1709232682083,
 86                ),
 87            ]
 89            output = (
 90                p
 91                | beam.Create(LINES)
 92                | beam.Map(parse_json).with_output_types(EventLog)
 93            )
 95            assert_that(output, equal_to(EXPECTED_OUTPUT))
 98class AggregateTest(unittest.TestCase):
 99    def test_aggregate(self):
100        with TestPipeline() as p:
101            LINES = [
102                '{"ip": "", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
103                '{"ip": "", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET blastocladiomycota.html HTTP/1.0", "http_response": 200, "file_size_bytes": 446, "event_datetime": "2024-03-01T05:51:48.719", "event_ts": 1709232708719}',
104                '{"ip": "", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET home.html HTTP/1.0", "http_response": 200, "file_size_bytes": 318, "event_datetime": "2024-03-01T05:51:35.181", "event_ts": 1709232695181}',
105            ]
107            EXPECTED_OUTPUT = [
108                UserTraffic(
109                    id="462520009613048791",
110                    page_views=3,
111                    total_bytes=971,
112                    max_bytes=446,
113                    min_bytes=207,
114                )
115            ]
117            output = (
118                p
119                | beam.Create(LINES)
120                | beam.Map(parse_json).with_output_types(EventLog)
121                | beam.Map(lambda e: (e.id, e.file_size_bytes))
122                | beam.GroupByKey()
123                | beam.ParDo(Aggregate()).with_output_types(UserTraffic)
124            )
126            assert_that(output, equal_to(EXPECTED_OUTPUT))
129if __name__ == "__main__":
130    main(out=None)

We can perform unit testing of the batch pipeline simply by executing the test script.

1$ python section4/user_traffic_test.py 
2test_aggregate (__main__.AggregateTest) ... ok
3test_parse_json (__main__.ParseJsonTest) ... ok
4test_parse_null_lat_lng (__main__.ParseJsonTest) ... ok
7Ran 3 tests in 1.278s

Streaming Pipeline Testing

Pipeline Code

It begins with reading and decoding messages from a Kafka topic named website-visit, followed by parsing the decoded Json string into a custom type named EventLog. After that, timestamp is re-assigned based on the event_datetime attribute and the element is converted into a key-value pair where user ID is taken as the key and 1 is given as the value. Finally, the tuple elements are aggregated in a fixed time window of 20 seconds and written to a Kafka topic named traffic-agg. Note that the output messages include two additional attributes (window_start and window_end) to clarify in which window they belong to.

  1# section4/traffic_agg.py
  2import os
  3import datetime
  4import argparse
  5import json
  6import logging
  7import typing
  9import apache_beam as beam
 10from apache_beam.io import kafka
 11from apache_beam.options.pipeline_options import PipelineOptions
 12from apache_beam.options.pipeline_options import SetupOptions
 15class EventLog(typing.NamedTuple):
 16    ip: str
 17    id: str
 18    lat: float
 19    lng: float
 20    user_agent: str
 21    age_bracket: str
 22    opted_into_marketing: bool
 23    http_request: str
 24    http_response: int
 25    file_size_bytes: int
 26    event_datetime: str
 27    event_ts: int
 30def decode_message(kafka_kv: tuple):
 31    return kafka_kv[1].decode("utf-8")
 34def create_message(element: dict):
 35    key = {"event_id": element["id"], "window_start": element["window_start"]}
 36    print(element)
 37    return json.dumps(key).encode("utf-8"), json.dumps(element).encode("utf-8")
 40def parse_json(element: str):
 41    row = json.loads(element)
 42    # lat/lng sometimes empty string
 43    if not row["lat"] or not row["lng"]:
 44        row = {**row, **{"lat": -1, "lng": -1}}
 45    return EventLog(**row)
 48def assign_timestamp(element: EventLog):
 49    ts = datetime.datetime.strptime(
 50        element.event_datetime, "%Y-%m-%dT%H:%M:%S.%f"
 51    ).timestamp()
 52    return beam.window.TimestampedValue(element, ts)
 55class AddWindowTS(beam.DoFn):
 56    def process(self, element: tuple, window=beam.DoFn.WindowParam):
 57        window_start = window.start.to_utc_datetime().isoformat(timespec="seconds")
 58        window_end = window.end.to_utc_datetime().isoformat(timespec="seconds")
 59        output = {
 60            "id": element[0],
 61            "window_start": window_start,
 62            "window_end": window_end,
 63            "page_views": element[1],
 64        }
 65        yield output
 68def run():
 69    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
 70    parser.add_argument(
 71        "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
 72    )
 73    parser.add_argument(
 74        "--use_own",
 75        action="store_true",
 76        default="Flag to indicate whether to use a own local cluster",
 77    )
 78    opts = parser.parse_args()
 80    pipeline_opts = {
 81        "runner": opts.runner,
 82        "job_name": "traffic-agg",
 83        "environment_type": "LOOPBACK",
 84        "streaming": True,
 85        "parallelism": 3,
 86        "experiments": [
 87            "use_deprecated_read"
 88        ],  ## https://github.com/apache/beam/issues/20979
 89        "checkpointing_interval": "60000",
 90    }
 91    if opts.use_own is True:
 92        pipeline_opts = {**pipeline_opts, **{"flink_master": "localhost:8081"}}
 93    print(pipeline_opts)
 94    options = PipelineOptions([], **pipeline_opts)
 95    # Required, else it will complain that when importing worker functions
 96    options.view_as(SetupOptions).save_main_session = True
 98    p = beam.Pipeline(options=options)
 99    (
100        p
101        | "Read from Kafka"
102        >> kafka.ReadFromKafka(
103            consumer_config={
104                "bootstrap.servers": os.getenv(
105                    "BOOTSTRAP_SERVERS",
106                    "host.docker.internal:29092",
107                ),
108                "auto.offset.reset": "earliest",
109                # "enable.auto.commit": "true",
110                "group.id": "traffic-agg",
111            },
112            topics=["website-visit"],
113        )
114        | "Decode messages" >> beam.Map(decode_message)
115        | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
116        | "Assign timestamp" >> beam.Map(assign_timestamp)
117        | "Form key value pair" >> beam.Map(lambda e: (e.id, 1))
118        | "Tumble window per minute" >> beam.WindowInto(beam.window.FixedWindows(20))
119        | "Sum by key" >> beam.CombinePerKey(sum)
120        | "Add window timestamp" >> beam.ParDo(AddWindowTS())
121        | "Create messages"
122        >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
123        | "Write to Kafka"
124        >> kafka.WriteToKafka(
125            producer_config={
126                "bootstrap.servers": os.getenv(
127                    "BOOTSTRAP_SERVERS",
128                    "host.docker.internal:29092",
129                )
130            },
131            topic="traffic-agg",
132        )
133    )
135    logging.getLogger().setLevel(logging.INFO)
136    logging.info("Building pipeline ...")
138    p.run().wait_until_finish()
141if __name__ == "__main__":
142    run()

Test Pipeline

For testing the streaming pipeline, we use a TestStream, which is used to generate events on an unbounded PCollection of elements. The stream has three elements of a single user with the following timestamp values.

  • 2024-03-01T05:51:22.083
  • 2024-03-01T05:51:32.083
  • 2024-03-01T05:51:52.083

Therefore, if we aggregate the elements in a fixed time window of 20 seconds, we can expect the first two elements are grouped together while the last element is grouped on its own. The expected output can be created accordingly and compared to the pipeline output for verification.

 1# section4/traffic_agg_test.py
 2import datetime
 3import sys
 4import unittest
 6import apache_beam as beam
 7from apache_beam.testing.test_pipeline import TestPipeline
 8from apache_beam.testing.util import assert_that, equal_to
 9from apache_beam.testing.test_stream import TestStream
10from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
12from traffic_agg import EventLog, parse_json, assign_timestamp
15def main(out=sys.stderr, verbosity=2):
16    loader = unittest.TestLoader()
18    suite = loader.loadTestsFromModule(sys.modules[__name__])
19    unittest.TextTestRunner(out, verbosity=verbosity).run(suite)
22class TrafficWindowingTest(unittest.TestCase):
23    def test_windowing_behaviour(self):
24        options = PipelineOptions()
25        options.view_as(StandardOptions).streaming = True
26        with TestPipeline(options=options) as p:
27            EVENTS = [
28                '{"ip": "", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
29                '{"ip": "", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:32.083", "event_ts": 1709232682083}',
30                '{"ip": "", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:52.083", "event_ts": 1709232682083}',
31            ]
33            test_stream = (
34                TestStream()
35                .advance_watermark_to(0)
36                .add_elements([EVENTS[0], EVENTS[1], EVENTS[2]])
37                .advance_watermark_to_infinity()
38            )
40            output = (
41                p
42                | test_stream
43                | beam.Map(parse_json).with_output_types(EventLog)
44                | beam.Map(assign_timestamp)
45                | beam.Map(lambda e: (e.id, 1))
46                | beam.WindowInto(beam.window.FixedWindows(20))
47                | beam.CombinePerKey(sum)
48            )
50            EXPECTED_OUTPUT = [("462520009613048791", 2), ("462520009613048791", 1)]
52            assert_that(output, equal_to(EXPECTED_OUTPUT))
55if __name__ == "__main__":
56    main(out=None)

Similar to the previous testing, we can execute the test script for performing unit testing of the streaming pipeline.

1$ python section4/traffic_agg_test.py 
2test_windowing_behaviour (__main__.TrafficWindowingTest) ... ok
5Ran 1 test in 0.527s


In this series of posts, we discussed local development of Apache Beam pipelines using Python. In Part 1, a basic Beam pipeline was introduced, followed by demonstrating how to utilise Jupyter notebooks for interactive development. Several notebook examples were covered including Beam SQL and Beam DataFrames. Batch pipelines were developed in Part 2, and we used pipelines from GCP Python DataFlow Quest while modifying them to access local resources only. Each batch pipeline has two versions with/without SQL. In Part 3, we discussed how to set up local Flink and Kafka clusters for deploying streaming pipelines on the Flink Runner. A streaming pipeline with/without Beam SQL was built in Part 4, and this series concludes with illustrating unit testing of existing pipelines in this post.