We developed batch and streaming pipelines in Part 2 and Part 4. Often it is faster and simpler to identify and fix bugs on the pipeline code by performing local unit testing. Moreover, especially when it comes to creating a streaming pipeline, unit testing cases can facilitate development further by using TestStream as it allows us to advance watermarks or processing time according to different scenarios. In this post, we discuss how to perform unit testing of the batch and streaming pipelines that we developed earlier.
- Part 1 Pipeline, Notebook, SQL and DataFrame
- Part 2 Batch Pipelines
- Part 3 Flink Runner
- Part 4 Streaming Pipelines
- Part 5 Testing Pipelines (this post)
Batch Pipeline Testing
Pipeline Code
The pipeline begins with reading data from a folder named inputs and parses the Json lines. Then it creates a key-value pair where the key is the user ID (id
) and the value is the file size bytes (file_size_bytes
). After that, the records are grouped by the key and aggregated to obtain website visit count and traffic consumption distribution using a ParDo transform. Finally, the output records are written to a folder named outputs after being converted into dictionary.
1# section4/user_traffic.py
2import os
3import datetime
4import argparse
5import json
6import logging
7import typing
8
9import apache_beam as beam
10from apache_beam.options.pipeline_options import PipelineOptions
11from apache_beam.options.pipeline_options import StandardOptions
12
13
14class EventLog(typing.NamedTuple):
15 ip: str
16 id: str
17 lat: float
18 lng: float
19 user_agent: str
20 age_bracket: str
21 opted_into_marketing: bool
22 http_request: str
23 http_response: int
24 file_size_bytes: int
25 event_datetime: str
26 event_ts: int
27
28
29class UserTraffic(typing.NamedTuple):
30 id: str
31 page_views: int
32 total_bytes: int
33 max_bytes: int
34 min_bytes: int
35
36
37def parse_json(element: str):
38 row = json.loads(element)
39 # lat/lng sometimes empty string
40 if not row["lat"] or not row["lng"]:
41 row = {**row, **{"lat": -1, "lng": -1}}
42 return EventLog(**row)
43
44
45class Aggregate(beam.DoFn):
46 def process(self, element: typing.Tuple[str, typing.Iterable[int]]):
47 key, values = element
48 yield UserTraffic(
49 id=key,
50 page_views=len(values),
51 total_bytes=sum(values),
52 max_bytes=max(values),
53 min_bytes=min(values),
54 )
55
56
57def run():
58 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
59 parser.add_argument(
60 "--inputs",
61 default="inputs",
62 help="Specify folder name that event records are saved",
63 )
64 parser.add_argument(
65 "--runner", default="DirectRunner", help="Specify Apache Beam Runner"
66 )
67 opts = parser.parse_args()
68 PARENT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
69
70 options = PipelineOptions()
71 options.view_as(StandardOptions).runner = opts.runner
72
73 p = beam.Pipeline(options=options)
74 (
75 p
76 | "Read from files"
77 >> beam.io.ReadFromText(
78 file_pattern=os.path.join(PARENT_DIR, opts.inputs, "*.out")
79 )
80 | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
81 | "Form key value pair" >> beam.Map(lambda e: (e.id, e.file_size_bytes))
82 | "Group by key" >> beam.GroupByKey()
83 | "Aggregate by id" >> beam.ParDo(Aggregate()).with_output_types(UserTraffic)
84 | "To dict" >> beam.Map(lambda e: e._asdict())
85 | "Write to file"
86 >> beam.io.WriteToText(
87 file_path_prefix=os.path.join(
88 PARENT_DIR,
89 "outputs",
90 f"{int(datetime.datetime.now().timestamp() * 1000)}",
91 ),
92 file_name_suffix=".out",
93 )
94 )
95
96 logging.getLogger().setLevel(logging.INFO)
97 logging.info("Building pipeline ...")
98
99 p.run().wait_until_finish()
100
101
102if __name__ == "__main__":
103 run()
Test Pipeline
As illustrated in the Beam documentation, we can use the following pattern to test a Beam pipeline.
- Create a TestPipeline.
- Create some static, known test input data.
- Use the Create transform to create a PCollection of your input data.
- Apply your transform to the input PCollection and save the resulting output PCollection.
- Use PAssert and its subclasses (or testing functions in Python) to verify that the output PCollection contains the elements that you expect.
We have three unit testing cases for this batch pipeline. The first two cases checks whether the input elements are parsed into the custom EventLog type as expected while the last case is used to test if the pipeline aggregates the elements by user correctly. In all cases, pipeline outputs are compared to expected outputs for verification.
1# section4/user_traffic_test.py
2import sys
3import unittest
4
5import apache_beam as beam
6from apache_beam.testing.test_pipeline import TestPipeline
7from apache_beam.testing.util import assert_that, equal_to
8
9from user_traffic import EventLog, UserTraffic, parse_json, Aggregate
10
11
12def main(out=sys.stderr, verbosity=2):
13 loader = unittest.TestLoader()
14
15 suite = loader.loadTestsFromModule(sys.modules[__name__])
16 unittest.TextTestRunner(out, verbosity=verbosity).run(suite)
17
18
19class ParseJsonTest(unittest.TestCase):
20 def test_parse_json(self):
21 with TestPipeline() as p:
22 LINES = [
23 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
24 '{"ip": "105.100.237.193", "id": "5135574965990269004", "lat": 36.7323, "lng": 3.0875, "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_9 rv:2.0; wo-SN) AppleWebKit/531.18.2 (KHTML, like Gecko) Version/4.0.4 Safari/531.18.2", "age_bracket": "26-40", "opted_into_marketing": false, "http_request": "GET coniferophyta.html HTTP/1.0", "http_response": 200, "file_size_bytes": 427, "event_datetime": "2024-03-01T05:48:52.985", "event_ts": 1709232532985}',
25 ]
26
27 EXPECTED_OUTPUT = [
28 EventLog(
29 ip="138.201.212.70",
30 id="462520009613048791",
31 lat=50.4779,
32 lng=12.3713,
33 user_agent="Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7",
34 age_bracket="18-25",
35 opted_into_marketing=False,
36 http_request="GET eucharya.html HTTP/1.0",
37 http_response=200,
38 file_size_bytes=207,
39 event_datetime="2024-03-01T05:51:22.083",
40 event_ts=1709232682083,
41 ),
42 EventLog(
43 ip="105.100.237.193",
44 id="5135574965990269004",
45 lat=36.7323,
46 lng=3.0875,
47 user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_9 rv:2.0; wo-SN) AppleWebKit/531.18.2 (KHTML, like Gecko) Version/4.0.4 Safari/531.18.2",
48 age_bracket="26-40",
49 opted_into_marketing=False,
50 http_request="GET coniferophyta.html HTTP/1.0",
51 http_response=200,
52 file_size_bytes=427,
53 event_datetime="2024-03-01T05:48:52.985",
54 event_ts=1709232532985,
55 ),
56 ]
57
58 output = (
59 p
60 | beam.Create(LINES)
61 | beam.Map(parse_json).with_output_types(EventLog)
62 )
63
64 assert_that(output, equal_to(EXPECTED_OUTPUT))
65
66 def test_parse_null_lat_lng(self):
67 with TestPipeline() as p:
68 LINES = [
69 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": null, "lng": null, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
70 ]
71
72 EXPECTED_OUTPUT = [
73 EventLog(
74 ip="138.201.212.70",
75 id="462520009613048791",
76 lat=-1,
77 lng=-1,
78 user_agent="Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7",
79 age_bracket="18-25",
80 opted_into_marketing=False,
81 http_request="GET eucharya.html HTTP/1.0",
82 http_response=200,
83 file_size_bytes=207,
84 event_datetime="2024-03-01T05:51:22.083",
85 event_ts=1709232682083,
86 ),
87 ]
88
89 output = (
90 p
91 | beam.Create(LINES)
92 | beam.Map(parse_json).with_output_types(EventLog)
93 )
94
95 assert_that(output, equal_to(EXPECTED_OUTPUT))
96
97
98class AggregateTest(unittest.TestCase):
99 def test_aggregate(self):
100 with TestPipeline() as p:
101 LINES = [
102 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
103 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET blastocladiomycota.html HTTP/1.0", "http_response": 200, "file_size_bytes": 446, "event_datetime": "2024-03-01T05:51:48.719", "event_ts": 1709232708719}',
104 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET home.html HTTP/1.0", "http_response": 200, "file_size_bytes": 318, "event_datetime": "2024-03-01T05:51:35.181", "event_ts": 1709232695181}',
105 ]
106
107 EXPECTED_OUTPUT = [
108 UserTraffic(
109 id="462520009613048791",
110 page_views=3,
111 total_bytes=971,
112 max_bytes=446,
113 min_bytes=207,
114 )
115 ]
116
117 output = (
118 p
119 | beam.Create(LINES)
120 | beam.Map(parse_json).with_output_types(EventLog)
121 | beam.Map(lambda e: (e.id, e.file_size_bytes))
122 | beam.GroupByKey()
123 | beam.ParDo(Aggregate()).with_output_types(UserTraffic)
124 )
125
126 assert_that(output, equal_to(EXPECTED_OUTPUT))
127
128
129if __name__ == "__main__":
130 main(out=None)
We can perform unit testing of the batch pipeline simply by executing the test script.
1$ python section4/user_traffic_test.py
2test_aggregate (__main__.AggregateTest) ... ok
3test_parse_json (__main__.ParseJsonTest) ... ok
4test_parse_null_lat_lng (__main__.ParseJsonTest) ... ok
5
6----------------------------------------------------------------------
7Ran 3 tests in 1.278s
8
9OK
Streaming Pipeline Testing
Pipeline Code
It begins with reading and decoding messages from a Kafka topic named website-visit, followed by parsing the decoded Json string into a custom type named EventLog. After that, timestamp is re-assigned based on the event_datetime attribute and the element is converted into a key-value pair where user ID is taken as the key and 1 is given as the value. Finally, the tuple elements are aggregated in a fixed time window of 20 seconds and written to a Kafka topic named traffic-agg. Note that the output messages include two additional attributes (window_start and window_end) to clarify in which window they belong to.
1# section4/traffic_agg.py
2import os
3import datetime
4import argparse
5import json
6import logging
7import typing
8
9import apache_beam as beam
10from apache_beam.io import kafka
11from apache_beam.options.pipeline_options import PipelineOptions
12from apache_beam.options.pipeline_options import SetupOptions
13
14
15class EventLog(typing.NamedTuple):
16 ip: str
17 id: str
18 lat: float
19 lng: float
20 user_agent: str
21 age_bracket: str
22 opted_into_marketing: bool
23 http_request: str
24 http_response: int
25 file_size_bytes: int
26 event_datetime: str
27 event_ts: int
28
29
30def decode_message(kafka_kv: tuple):
31 return kafka_kv[1].decode("utf-8")
32
33
34def create_message(element: dict):
35 key = {"event_id": element["id"], "window_start": element["window_start"]}
36 print(element)
37 return json.dumps(key).encode("utf-8"), json.dumps(element).encode("utf-8")
38
39
40def parse_json(element: str):
41 row = json.loads(element)
42 # lat/lng sometimes empty string
43 if not row["lat"] or not row["lng"]:
44 row = {**row, **{"lat": -1, "lng": -1}}
45 return EventLog(**row)
46
47
48def assign_timestamp(element: EventLog):
49 ts = datetime.datetime.strptime(
50 element.event_datetime, "%Y-%m-%dT%H:%M:%S.%f"
51 ).timestamp()
52 return beam.window.TimestampedValue(element, ts)
53
54
55class AddWindowTS(beam.DoFn):
56 def process(self, element: tuple, window=beam.DoFn.WindowParam):
57 window_start = window.start.to_utc_datetime().isoformat(timespec="seconds")
58 window_end = window.end.to_utc_datetime().isoformat(timespec="seconds")
59 output = {
60 "id": element[0],
61 "window_start": window_start,
62 "window_end": window_end,
63 "page_views": element[1],
64 }
65 yield output
66
67
68def run():
69 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
70 parser.add_argument(
71 "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
72 )
73 parser.add_argument(
74 "--use_own",
75 action="store_true",
76 default="Flag to indicate whether to use a own local cluster",
77 )
78 opts = parser.parse_args()
79
80 pipeline_opts = {
81 "runner": opts.runner,
82 "job_name": "traffic-agg",
83 "environment_type": "LOOPBACK",
84 "streaming": True,
85 "parallelism": 3,
86 "experiments": [
87 "use_deprecated_read"
88 ], ## https://github.com/apache/beam/issues/20979
89 "checkpointing_interval": "60000",
90 }
91 if opts.use_own is True:
92 pipeline_opts = {**pipeline_opts, **{"flink_master": "localhost:8081"}}
93 print(pipeline_opts)
94 options = PipelineOptions([], **pipeline_opts)
95 # Required, else it will complain that when importing worker functions
96 options.view_as(SetupOptions).save_main_session = True
97
98 p = beam.Pipeline(options=options)
99 (
100 p
101 | "Read from Kafka"
102 >> kafka.ReadFromKafka(
103 consumer_config={
104 "bootstrap.servers": os.getenv(
105 "BOOTSTRAP_SERVERS",
106 "host.docker.internal:29092",
107 ),
108 "auto.offset.reset": "earliest",
109 # "enable.auto.commit": "true",
110 "group.id": "traffic-agg",
111 },
112 topics=["website-visit"],
113 )
114 | "Decode messages" >> beam.Map(decode_message)
115 | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
116 | "Assign timestamp" >> beam.Map(assign_timestamp)
117 | "Form key value pair" >> beam.Map(lambda e: (e.id, 1))
118 | "Tumble window per minute" >> beam.WindowInto(beam.window.FixedWindows(20))
119 | "Sum by key" >> beam.CombinePerKey(sum)
120 | "Add window timestamp" >> beam.ParDo(AddWindowTS())
121 | "Create messages"
122 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
123 | "Write to Kafka"
124 >> kafka.WriteToKafka(
125 producer_config={
126 "bootstrap.servers": os.getenv(
127 "BOOTSTRAP_SERVERS",
128 "host.docker.internal:29092",
129 )
130 },
131 topic="traffic-agg",
132 )
133 )
134
135 logging.getLogger().setLevel(logging.INFO)
136 logging.info("Building pipeline ...")
137
138 p.run().wait_until_finish()
139
140
141if __name__ == "__main__":
142 run()
Test Pipeline
For testing the streaming pipeline, we use a TestStream, which is used to generate events on an unbounded PCollection of elements. The stream has three elements of a single user with the following timestamp values.
- 2024-03-01T05:51:22.083
- 2024-03-01T05:51:32.083
- 2024-03-01T05:51:52.083
Therefore, if we aggregate the elements in a fixed time window of 20 seconds, we can expect the first two elements are grouped together while the last element is grouped on its own. The expected output can be created accordingly and compared to the pipeline output for verification.
1# section4/traffic_agg_test.py
2import datetime
3import sys
4import unittest
5
6import apache_beam as beam
7from apache_beam.testing.test_pipeline import TestPipeline
8from apache_beam.testing.util import assert_that, equal_to
9from apache_beam.testing.test_stream import TestStream
10from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
11
12from traffic_agg import EventLog, parse_json, assign_timestamp
13
14
15def main(out=sys.stderr, verbosity=2):
16 loader = unittest.TestLoader()
17
18 suite = loader.loadTestsFromModule(sys.modules[__name__])
19 unittest.TextTestRunner(out, verbosity=verbosity).run(suite)
20
21
22class TrafficWindowingTest(unittest.TestCase):
23 def test_windowing_behaviour(self):
24 options = PipelineOptions()
25 options.view_as(StandardOptions).streaming = True
26 with TestPipeline(options=options) as p:
27 EVENTS = [
28 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:22.083", "event_ts": 1709232682083}',
29 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:32.083", "event_ts": 1709232682083}',
30 '{"ip": "138.201.212.70", "id": "462520009613048791", "lat": 50.4779, "lng": 12.3713, "user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 3_1 like Mac OS X; ks-IN) AppleWebKit/532.30.7 (KHTML, like Gecko) Version/3.0.5 Mobile/8B115 Safari/6532.30.7", "age_bracket": "18-25", "opted_into_marketing": false, "http_request": "GET eucharya.html HTTP/1.0", "http_response": 200, "file_size_bytes": 207, "event_datetime": "2024-03-01T05:51:52.083", "event_ts": 1709232682083}',
31 ]
32
33 test_stream = (
34 TestStream()
35 .advance_watermark_to(0)
36 .add_elements([EVENTS[0], EVENTS[1], EVENTS[2]])
37 .advance_watermark_to_infinity()
38 )
39
40 output = (
41 p
42 | test_stream
43 | beam.Map(parse_json).with_output_types(EventLog)
44 | beam.Map(assign_timestamp)
45 | beam.Map(lambda e: (e.id, 1))
46 | beam.WindowInto(beam.window.FixedWindows(20))
47 | beam.CombinePerKey(sum)
48 )
49
50 EXPECTED_OUTPUT = [("462520009613048791", 2), ("462520009613048791", 1)]
51
52 assert_that(output, equal_to(EXPECTED_OUTPUT))
53
54
55if __name__ == "__main__":
56 main(out=None)
Similar to the previous testing, we can execute the test script for performing unit testing of the streaming pipeline.
1$ python section4/traffic_agg_test.py
2test_windowing_behaviour (__main__.TrafficWindowingTest) ... ok
3
4----------------------------------------------------------------------
5Ran 1 test in 0.527s
6
7OK
Summary
In this series of posts, we discussed local development of Apache Beam pipelines using Python. In Part 1, a basic Beam pipeline was introduced, followed by demonstrating how to utilise Jupyter notebooks for interactive development. Several notebook examples were covered including Beam SQL and Beam DataFrames. Batch pipelines were developed in Part 2, and we used pipelines from GCP Python DataFlow Quest while modifying them to access local resources only. Each batch pipeline has two versions with/without SQL. In Part 3, we discussed how to set up local Flink and Kafka clusters for deploying streaming pipelines on the Flink Runner. A streaming pipeline with/without Beam SQL was built in Part 4, and this series concludes with illustrating unit testing of existing pipelines in this post.
Comments