In Part 3, we discussed the portability layer of Apache Beam as it helps understand (1) how Python pipelines run on the Flink Runner and (2) how multiple SDKs can be used in a single pipeline, followed by demonstrating local Flink and Kafka cluster creation for developing streaming pipelines. In this post, we build a streaming pipeline that aggregates page visits by user in a fixed time window of 20 seconds. Two versions of the pipeline are created with/without relying on Beam SQL.
- Part 1 Pipeline, Notebook, SQL and DataFrame
- Part 2 Batch Pipelines
- Part 3 Flink Runner
- Part 4 Streaming Pipelines (this post)
- Part 5 Testing Pipelines
Streaming Pipeline
The streaming pipeline we discuss in this post aggregates website visits by user ID in a fixed time window of 20 seconds. Two versions of the pipeline are created with/without relying on Beam SQL, and they run on a Flink cluster at the end. The source of this post can be found in this GitHub repository.
Traffic Aggregation
It begins with reading and decoding messages from a Kafka topic named website-visit, followed by parsing the decoded Json string into a custom type named EventLog. Note the coder for this custom type is registered, but it is not required because we don’t have a cross-language transformation that deals with it. On the other hand, the coder has to be registered for the SQL version because it is used by the SQL transformation, which is performed using the Java SDK.
After that, timestamp is re-assigned based on the event_datetime attribute and the element is converted into a key-value pair where user ID is taken as the key and 1 is given as the value. By default, the Kafka reader assigns processing time (wall clock) as the element timestamp. If record timestamp is different from wall clock, we would have more relevant outcomes by re-assigning based on record timestamp.
The tuple elements are aggregated in a fixed time window of 20 seconds and written to a Kafka topic named traffic-agg. The output messages include two additional attributes (window_start and window_end) to clarify in which window they belong to.
1# section3/traffic_agg.py
2import os
3import datetime
4import argparse
5import json
6import logging
7import typing
8
9import apache_beam as beam
10from apache_beam.io import kafka
11from apache_beam.options.pipeline_options import PipelineOptions
12from apache_beam.options.pipeline_options import SetupOptions
13
14
15class EventLog(typing.NamedTuple):
16 ip: str
17 id: str
18 lat: float
19 lng: float
20 user_agent: str
21 age_bracket: str
22 opted_into_marketing: bool
23 http_request: str
24 http_response: int
25 file_size_bytes: int
26 event_datetime: str
27 event_ts: int
28
29
30beam.coders.registry.register_coder(EventLog, beam.coders.RowCoder)
31
32
33def decode_message(kafka_kv: tuple):
34 return kafka_kv[1].decode("utf-8")
35
36
37def create_message(element: dict):
38 key = {"event_id": element["id"], "window_start": element["window_start"]}
39 print(element)
40 return json.dumps(key).encode("utf-8"), json.dumps(element).encode("utf-8")
41
42
43def parse_json(element: str):
44 row = json.loads(element)
45 # lat/lng sometimes empty string
46 if not row["lat"] or not row["lng"]:
47 row = {**row, **{"lat": -1, "lng": -1}}
48 return EventLog(**row)
49
50
51def assign_timestamp(element: EventLog):
52 ts = datetime.datetime.strptime(
53 element.event_datetime, "%Y-%m-%dT%H:%M:%S.%f"
54 ).timestamp()
55 return beam.window.TimestampedValue(element, ts)
56
57
58class AddWindowTS(beam.DoFn):
59 def process(self, element: tuple, window=beam.DoFn.WindowParam):
60 window_start = window.start.to_utc_datetime().isoformat(timespec="seconds")
61 window_end = window.end.to_utc_datetime().isoformat(timespec="seconds")
62 output = {
63 "id": element[0],
64 "window_start": window_start,
65 "window_end": window_end,
66 "page_views": element[1],
67 }
68 yield output
69
70
71def run():
72 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
73 parser.add_argument(
74 "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
75 )
76 parser.add_argument(
77 "--use_own",
78 action="store_true",
79 default="Flag to indicate whether to use an own local cluster",
80 )
81 opts = parser.parse_args()
82
83 pipeline_opts = {
84 "runner": opts.runner,
85 "job_name": "traffic-agg",
86 "environment_type": "LOOPBACK",
87 "streaming": True,
88 "parallelism": 3,
89 "experiments": [
90 "use_deprecated_read"
91 ], ## https://github.com/apache/beam/issues/20979
92 "checkpointing_interval": "60000",
93 }
94 if opts.use_own is True:
95 pipeline_opts = {**pipeline_opts, **{"flink_master": "localhost:8081"}}
96 print(pipeline_opts)
97 options = PipelineOptions([], **pipeline_opts)
98 # Required, else it will complain that when importing worker functions
99 options.view_as(SetupOptions).save_main_session = True
100
101 p = beam.Pipeline(options=options)
102 (
103 p
104 | "Read from Kafka"
105 >> kafka.ReadFromKafka(
106 consumer_config={
107 "bootstrap.servers": os.getenv(
108 "BOOTSTRAP_SERVERS",
109 "host.docker.internal:29092",
110 ),
111 "auto.offset.reset": "earliest",
112 # "enable.auto.commit": "true",
113 "group.id": "traffic-agg",
114 },
115 topics=["website-visit"],
116 )
117 | "Decode messages" >> beam.Map(decode_message)
118 | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
119 | "Assign timestamp" >> beam.Map(assign_timestamp)
120 | "Form key value pair" >> beam.Map(lambda e: (e.id, 1))
121 | "Tumble window per minute" >> beam.WindowInto(beam.window.FixedWindows(20))
122 | "Sum by key" >> beam.CombinePerKey(sum)
123 | "Add window timestamp" >> beam.ParDo(AddWindowTS())
124 | "Create messages"
125 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
126 | "Write to Kafka"
127 >> kafka.WriteToKafka(
128 producer_config={
129 "bootstrap.servers": os.getenv(
130 "BOOTSTRAP_SERVERS",
131 "host.docker.internal:29092",
132 )
133 },
134 topic="traffic-agg",
135 )
136 )
137
138 logging.getLogger().setLevel(logging.INFO)
139 logging.info("Building pipeline ...")
140
141 p.run().wait_until_finish()
142
143
144if __name__ == "__main__":
145 run()
SQL Traffic Aggregation
The main difference of this version is that multiple transformations are performed by a single SQL transformation. Specifically it aggregates the number of page views by user in a fixed time window of 20 seconds. The SQL transformation performed in a separate Docker container using the Java SDK and thus the output type has to be specified before it. Otherwise, an error is thrown because the Java SDK doesn’t know how to encode/decode the elements.
1# section3/traffic_agg_sql.py
2import os
3import datetime
4import argparse
5import json
6import logging
7import typing
8
9import apache_beam as beam
10from apache_beam.io import kafka
11from apache_beam.transforms.sql import SqlTransform
12from apache_beam.options.pipeline_options import PipelineOptions
13from apache_beam.options.pipeline_options import SetupOptions
14
15
16class EventLog(typing.NamedTuple):
17 ip: str
18 id: str
19 lat: float
20 lng: float
21 user_agent: str
22 age_bracket: str
23 opted_into_marketing: bool
24 http_request: str
25 http_response: int
26 file_size_bytes: int
27 event_datetime: str
28 event_ts: int
29
30
31beam.coders.registry.register_coder(EventLog, beam.coders.RowCoder)
32
33
34def decode_message(kafka_kv: tuple):
35 return kafka_kv[1].decode("utf-8")
36
37
38def create_message(element: dict):
39 key = {"event_id": element["event_id"], "window_start": element["window_start"]}
40 print(element)
41 return json.dumps(key).encode("utf-8"), json.dumps(element).encode("utf-8")
42
43
44def parse_json(element: str):
45 row = json.loads(element)
46 # lat/lng sometimes empty string
47 if not row["lat"] or not row["lng"]:
48 row = {**row, **{"lat": -1, "lng": -1}}
49 return EventLog(**row)
50
51
52def format_timestamp(element: EventLog):
53 event_ts = datetime.datetime.fromisoformat(element.event_datetime)
54 temp_dict = element._asdict()
55 temp_dict["event_datetime"] = datetime.datetime.strftime(
56 event_ts, "%Y-%m-%d %H:%M:%S"
57 )
58 return EventLog(**temp_dict)
59
60
61def run():
62 parser = argparse.ArgumentParser(description="Beam pipeline arguments")
63 parser.add_argument(
64 "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
65 )
66 parser.add_argument(
67 "--use_own",
68 action="store_true",
69 default="Flag to indicate whether to use an own local cluster",
70 )
71 opts = parser.parse_args()
72
73 options = PipelineOptions()
74 pipeline_opts = {
75 "runner": opts.runner,
76 "job_name": "traffic-agg-sql",
77 "environment_type": "LOOPBACK",
78 "streaming": True,
79 "parallelism": 3,
80 "experiments": [
81 "use_deprecated_read"
82 ], ## https://github.com/apache/beam/issues/20979
83 "checkpointing_interval": "60000",
84 }
85 if opts.use_own is True:
86 pipeline_opts = {**pipeline_opts, **{"flink_master": "localhost:8081"}}
87 print(pipeline_opts)
88 options = PipelineOptions([], **pipeline_opts)
89 # Required, else it will complain that when importing worker functions
90 options.view_as(SetupOptions).save_main_session = True
91
92 query = """
93 WITH cte AS (
94 SELECT
95 id,
96 CAST(event_datetime AS TIMESTAMP) AS ts
97 FROM PCOLLECTION
98 )
99 SELECT
100 id AS event_id,
101 CAST(TUMBLE_START(ts, INTERVAL '20' SECOND) AS VARCHAR) AS window_start,
102 CAST(TUMBLE_END(ts, INTERVAL '20' SECOND) AS VARCHAR) AS window_end,
103 COUNT(*) AS page_view
104 FROM cte
105 GROUP BY
106 TUMBLE(ts, INTERVAL '20' SECOND), id
107 """
108
109 p = beam.Pipeline(options=options)
110 (
111 p
112 | "Read from Kafka"
113 >> kafka.ReadFromKafka(
114 consumer_config={
115 "bootstrap.servers": os.getenv(
116 "BOOTSTRAP_SERVERS",
117 "host.docker.internal:29092",
118 ),
119 "auto.offset.reset": "earliest",
120 # "enable.auto.commit": "true",
121 "group.id": "traffic-agg-sql",
122 },
123 topics=["website-visit"],
124 )
125 | "Decode messages" >> beam.Map(decode_message)
126 | "Parse elements" >> beam.Map(parse_json)
127 | "Format timestamp" >> beam.Map(format_timestamp).with_output_types(EventLog)
128 | "Count per minute" >> SqlTransform(query)
129 | "To dictionary" >> beam.Map(lambda e: e._asdict())
130 | "Create messages"
131 >> beam.Map(create_message).with_output_types(typing.Tuple[bytes, bytes])
132 | "Write to Kafka"
133 >> kafka.WriteToKafka(
134 producer_config={
135 "bootstrap.servers": os.getenv(
136 "BOOTSTRAP_SERVERS",
137 "host.docker.internal:29092",
138 )
139 },
140 topic="traffic-agg-sql",
141 )
142 )
143
144 logging.getLogger().setLevel(logging.INFO)
145 logging.info("Building pipeline ...")
146
147 p.run().wait_until_finish()
148
149
150if __name__ == "__main__":
151 run()
Run Pipeline
We can use local Flink and Kafka clusters as discussed in Part 3. The Flink cluster is optional as Beam runs a pipeline on an embedded Flink cluster if we do not specify a cluster URL.
Start Flink/Kafka Clusters
As shown later, I have an issue to run the SQL version of the pipeline on a local cluster, and it has to be deployed on an embedded cluster instead. With the -a option, we can deploy local Flink and Kafka clusters, and they are used for the pipeline without SQL while only a local Kafka cluster is launched for the SQL version with the -k option.
1# start both flink and kafka cluster for traffic aggregation
2$ ./setup/start-flink-env.sh -a
3
4# start only kafka cluster for sql traffic aggregation
5$ ./setup/start-flink-env.sh -k
Data Generation
For streaming data generation, we can use the website visit log generator that was introduced in Part 1. We can execute the script while specifying the source argument to streaming. Below shows an example of generating Kafka messages for the streaming pipeline.
1$ python datagen/generate_data.py --source streaming --num_users 5 --delay_seconds 0.5
2...
310 events created so far...
4{'ip': '151.21.93.137', 'id': '2142139324490406578', 'lat': 45.5253, 'lng': 9.333, 'user_agent': 'Mozilla/5.0 (iPad; CPU iPad OS 14_2_1 like Mac OS X) AppleWebKit/536.0 (KHTML, like Gecko) FxiOS/16.3w0588.0 Mobile/66I206 Safari/536.0', 'age_bracket': '26-40', 'opted_into_marketing': True, 'http_request': 'GET amoebozoa.html HTTP/1.0', 'http_response': 200, 'file_size_bytes': 453, 'event_datetime': '2024-04-28T23:12:50.484', 'event_ts': 1714309970484}
520 events created so far...
6{'ip': '146.13.4.138', 'id': '5642783739616136718', 'lat': 39.0437, 'lng': -77.4875, 'user_agent': 'Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_7_5 rv:4.0; bg-BG) AppleWebKit/532.16.6 (KHTML, like Gecko) Version/4.0.5 Safari/532.16.6', 'age_bracket': '41-55', 'opted_into_marketing': False, 'http_request': 'GET archaea.html HTTP/1.0', 'http_response': 200, 'file_size_bytes': 207, 'event_datetime': '2024-04-28T23:12:55.526', 'event_ts': 1714309975526}
730 events created so far...
8{'ip': '36.255.131.188', 'id': '676397447776623774', 'lat': 31.2222, 'lng': 121.4581, 'user_agent': 'Mozilla/5.0 (compatible; MSIE 7.0; Windows 98; Win 9x 4.90; Trident/4.0)', 'age_bracket': '26-40', 'opted_into_marketing': False, 'http_request': 'GET fungi.html HTTP/1.0', 'http_response': 200, 'file_size_bytes': 440, 'event_datetime': '2024-04-28T23:13:00.564', 'event_ts': 1714309980564}
Execute Pipeline Script
Traffic Aggregation
The traffic aggregation pipeline can be executed using the local Flink cluster by specifying the use_own argument.
1$ python section3/traffic_agg.py --use_own
After a while, we can check both the input and output topics in the Topics section of kafka-ui. It can be accessed on localhost:8080.
We can use the Flink web UI to monitor the pipeline as a Flink job. When we click the traffic-agg job in the Running Jobs section, we see 4 operations are linked in the Overview tab. The first two operations are polling and reading Kafka source description. All the transformations up to windowing the keyed elements are performed in the third operation, and the elements are aggregated and written to the Kafka output topic in the last operation.
SQL Traffic Aggregation
I see the following error when I execute the SQL version of the pipeline with the use_own option. It seems that the Java SDK container for SQL transformation fails to download its expansion service and does not complete initialisation steps - see Part 3 for details about how multiple SDKs can be used in a single pipeline. Therefore, the Flink job fails to access the SDK container, and it keeps recreate a new container.
We can see lots of containers are stopped and get recreated.
1$ docker ps -a --format "table {{.ID}}\t{{.Image}}\t{{.Status}}" | grep apache/beam_java11_sdk
246c51d89e966 apache/beam_java11_sdk:2.53.0 Up 7 seconds
32ad755fc66df apache/beam_java11_sdk:2.53.0 Up 7 seconds
4cf023d9bf39f apache/beam_java11_sdk:2.53.0 Exited (1) 13 seconds ago
5a549729318e3 apache/beam_java11_sdk:2.53.0 Exited (1) 38 seconds ago
695626f645252 apache/beam_java11_sdk:2.53.0 Exited (1) 57 seconds ago
738b56216e29a apache/beam_java11_sdk:2.53.0 Exited (1) About a minute ago
83aee486b472f apache/beam_java11_sdk:2.53.0 Exited (1) About a minute ago
Instead, we can run the pipeline on an embedded Flink cluster without adding the use_own option. Note that we need to stop the existing clusters, start only a Kafka cluster with the -k option and re-generate data before executing this pipeline script.
1$ python section3/traffic_agg_sql.py
Similar to the earlier version, we can check the input and output topics on localhost:8080 as well.
Summary
In this post, we developed a streaming pipeline that aggregates website visits by user in a fixed time window of 20 seconds. Two versions of the pipeline were created with/without relying on Beam SQL. The first version that doesn’t rely on SQL was deployed on a local Flink cluster, and how it is deployed as a Flink job is checked on the Flink web UI. The second version, however, had an issue to deploy on a local Flink cluster, and it was deployed on an embedded Flink cluster.
Comments