I recently contributed to Apache Beam by adding a common pipeline pattern - Cache data using a shared object. Both batch and streaming pipelines are introduced, and they utilise the Shared
class of the Python SDK to enrich PCollection
elements. This pattern can be more memory-efficient than side inputs, simpler than a stateful DoFn
, and more performant than calling an external service, because it does not have to access an external service for every element or bundle of elements. In this post, we discuss this pattern in more details with batch and streaming use cases. For the latter, we configure the cache gets refreshed periodically.
Create a cache on a batch pipeline
Two data sets are used in the pipelines: order and customer. The order records include customer IDs that customer attributes are added to by mapping the customer records. In this batch example, the customer cache is loaded as a dictionary in the setup
method of the EnrichOrderFn
. The cache is used to add customer attributes to the order records. Note that we need to create a wrapper class (WeakRefDict
) because the Python dictionary doesn’t support weak references while a Shared
object encapsulates a weak reference to a singleton instance of the shared resource.
1import argparse
2import random
3import string
4import logging
5import time
6from datetime import datetime
7
8import apache_beam as beam
9from apache_beam.utils import shared
10from apache_beam.options.pipeline_options import PipelineOptions
11
12
13def gen_customers(version: int, num_cust: int = 1000):
14 d = dict()
15 for r in range(num_cust):
16 d[r] = {"version": version}
17 return d
18
19
20def gen_orders(ts: float, num_ord: int = 5, num_cust: int = 1000):
21 orders = [
22 {
23 "order_id": "".join(random.choices(string.ascii_lowercase, k=5)),
24 "customer_id": random.randrange(1, num_cust),
25 "timestamp": int(ts),
26 }
27 for _ in range(num_ord)
28 ]
29 for o in orders:
30 yield o
31
32
33# The wrapper class is needed for a dictionary, because it does not support weak references.
34class WeakRefDict(dict):
35 pass
36
37
38class EnrichOrderFn(beam.DoFn):
39 def __init__(self, shared_handle):
40 self._version = 1
41 self._customers = {}
42 self._shared_handle = shared_handle
43
44 def setup(self):
45 self._customer_lookup = self._shared_handle.acquire(self.load_customers)
46
47 def load_customers(self):
48 time.sleep(2)
49 self._customers = gen_customers(version=self._version)
50 return WeakRefDict(self._customers)
51
52 def process(self, element):
53 attr = self._customer_lookup.get(element["customer_id"], {})
54 yield {**element, **attr}
55
56
57def run(argv=None):
58 parser = argparse.ArgumentParser(
59 description="Shared class demo with a bounded PCollection"
60 )
61 _, pipeline_args = parser.parse_known_args(argv)
62 pipeline_options = PipelineOptions(pipeline_args)
63
64 with beam.Pipeline(options=pipeline_options) as p:
65 shared_handle = shared.Shared()
66 (
67 p
68 | beam.Create(gen_orders(ts=datetime.now().timestamp()))
69 | beam.ParDo(EnrichOrderFn(shared_handle))
70 | beam.Map(print)
71 )
72
73 logging.getLogger().setLevel(logging.INFO)
74 logging.info("Building pipeline ...")
75
76
77if __name__ == "__main__":
78 run()
When we execute the pipeline, we see that the customer attribute (version
) is added to the order records.
1INFO:root:Building pipeline ...
2INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
3{'order_id': 'lgkar', 'customer_id': 175, 'timestamp': 1724099730, 'version': 1}
4{'order_id': 'ylxcf', 'customer_id': 785, 'timestamp': 1724099730, 'version': 1}
5{'order_id': 'ypsof', 'customer_id': 446, 'timestamp': 1724099730, 'version': 1}
6{'order_id': 'aowzu', 'customer_id': 41, 'timestamp': 1724099730, 'version': 1}
7{'order_id': 'bbssb', 'customer_id': 194, 'timestamp': 1724099730, 'version': 1}
Create a cache and update it regularly on a streaming pipeline
Because the customer records are assumed to change over time, you need to refresh it periodically. To reload the shared object, change the tag
argument of the acquire
method. In this example, the refresh is implemented in the start_bundle
method, where it compares the current tag value to the value that is associated with the existing shared object. The set_tag
method returns a tag value that is the same within the maximum seconds of staleness. Therefore, if a tag value is greater than the existing tag value, it triggers a refresh of the customer cache.
1import argparse
2import random
3import string
4import logging
5import time
6from datetime import datetime
7
8import apache_beam as beam
9from apache_beam.utils import shared
10from apache_beam.transforms.periodicsequence import PeriodicImpulse
11from apache_beam.options.pipeline_options import PipelineOptions
12
13
14def gen_customers(version: int, tag: float, num_cust: int = 1000):
15 d = dict()
16 for r in range(num_cust):
17 d[r] = {"version": version}
18 d["tag"] = tag
19 return d
20
21
22def gen_orders(ts: float, num_ord: int = 5, num_cust: int = 1000):
23 orders = [
24 {
25 "order_id": "".join(random.choices(string.ascii_lowercase, k=5)),
26 "customer_id": random.randrange(1, num_cust),
27 "timestamp": int(ts),
28 }
29 for _ in range(num_ord)
30 ]
31 for o in orders:
32 yield o
33
34
35# wrapper class needed for a dictionary since it does not support weak references
36class WeakRefDict(dict):
37 pass
38
39
40class EnrichOrderFn(beam.DoFn):
41 def __init__(self, shared_handle, max_stale_sec):
42 self._max_stale_sec = max_stale_sec
43 self._version = 1
44 self._customers = {}
45 self._shared_handle = shared_handle
46
47 def setup(self):
48 self._customer_lookup = self._shared_handle.acquire(
49 self.load_customers, self.set_tag()
50 )
51
52 def set_tag(self):
53 current_ts = datetime.now().timestamp()
54 return current_ts - (current_ts % self._max_stale_sec)
55
56 def load_customers(self):
57 time.sleep(2)
58 self._customers = gen_customers(version=self._version, tag=self.set_tag())
59 return WeakRefDict(self._customers)
60
61 def start_bundle(self):
62 if self.set_tag() > self._customers["tag"]:
63 logging.info(
64 f"refresh customer cache, current tag {self.set_tag()}, existing tag {self._customers['tag']}..."
65 )
66 self._version += 1
67 self._customer_lookup = self._shared_handle.acquire(
68 self.load_customers, self.set_tag()
69 )
70
71 def process(self, element):
72 attr = self._customer_lookup.get(element["customer_id"], {})
73 yield {**element, **attr}
74
75
76def run(argv=None):
77 parser = argparse.ArgumentParser(
78 description="Shared class demo with an unbounded PCollection"
79 )
80 parser.add_argument(
81 "--fire_interval",
82 "-f",
83 type=int,
84 default=2,
85 help="Interval at which to output elements.",
86 )
87 parser.add_argument(
88 "--max_stale_sec",
89 "-m",
90 type=int,
91 default=5,
92 help="Maximum second of staleness.",
93 )
94 known_args, pipeline_args = parser.parse_known_args(argv)
95 pipeline_options = PipelineOptions(pipeline_args)
96
97 with beam.Pipeline(options=pipeline_options) as p:
98 shared_handle = shared.Shared()
99 (
100 p
101 | PeriodicImpulse(
102 fire_interval=known_args.fire_interval, apply_windowing=False
103 )
104 | beam.FlatMap(gen_orders)
105 | beam.ParDo(EnrichOrderFn(shared_handle, known_args.max_stale_sec))
106 | beam.Map(print)
107 )
108
109 logging.getLogger().setLevel(logging.INFO)
110 logging.info("Building pipeline ...")
111
112
113if __name__ == "__main__":
114 run()
The pipeline output shows the order records are enriched with updated customer attributes (version
). By default, the pipeline creates 5 order records in every 2 seconds, and the maximum seconds of staleness is 5 seconds. Therefore, we have 2.5 sets of order records in a particular version on average, which causes 10 to 15 records are mapped to a single version.
1INFO:root:Building pipeline ...
2INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
3{'order_id': 'rqicm', 'customer_id': 841, 'timestamp': 1724099759, 'version': 1}
4{'order_id': 'xjjnj', 'customer_id': 506, 'timestamp': 1724099759, 'version': 1}
5{'order_id': 'iuiua', 'customer_id': 612, 'timestamp': 1724099759, 'version': 1}
6{'order_id': 'yiwtq', 'customer_id': 486, 'timestamp': 1724099759, 'version': 1}
7{'order_id': 'aolsp', 'customer_id': 839, 'timestamp': 1724099759, 'version': 1}
8{'order_id': 'kkldd', 'customer_id': 223, 'timestamp': 1724099761, 'version': 1}
9{'order_id': 'pxbyr', 'customer_id': 262, 'timestamp': 1724099761, 'version': 1}
10{'order_id': 'yhsrx', 'customer_id': 899, 'timestamp': 1724099761, 'version': 1}
11{'order_id': 'elcqj', 'customer_id': 726, 'timestamp': 1724099761, 'version': 1}
12{'order_id': 'echap', 'customer_id': 32, 'timestamp': 1724099761, 'version': 1}
13{'order_id': 'swhgo', 'customer_id': 98, 'timestamp': 1724099763, 'version': 1}
14{'order_id': 'flcoq', 'customer_id': 6, 'timestamp': 1724099763, 'version': 1}
15{'order_id': 'erhed', 'customer_id': 116, 'timestamp': 1724099763, 'version': 1}
16{'order_id': 'mcupo', 'customer_id': 589, 'timestamp': 1724099763, 'version': 1}
17{'order_id': 'vbvgu', 'customer_id': 870, 'timestamp': 1724099763, 'version': 1}
18INFO:root:refresh customer cache, current tag 1724099765.0, existing tag 1724099760.0...
19{'order_id': 'solvl', 'customer_id': 186, 'timestamp': 1724099765, 'version': 2}
20{'order_id': 'ingmb', 'customer_id': 483, 'timestamp': 1724099765, 'version': 2}
21{'order_id': 'gckfl', 'customer_id': 194, 'timestamp': 1724099765, 'version': 2}
22{'order_id': 'azatw', 'customer_id': 995, 'timestamp': 1724099765, 'version': 2}
23{'order_id': 'ngivi', 'customer_id': 62, 'timestamp': 1724099765, 'version': 2}
24{'order_id': 'czzdh', 'customer_id': 100, 'timestamp': 1724099767, 'version': 2}
25{'order_id': 'xdadk', 'customer_id': 485, 'timestamp': 1724099767, 'version': 2}
26{'order_id': 'sytfd', 'customer_id': 845, 'timestamp': 1724099767, 'version': 2}
27{'order_id': 'ckkvc', 'customer_id': 278, 'timestamp': 1724099767, 'version': 2}
28{'order_id': 'vchzr', 'customer_id': 535, 'timestamp': 1724099767, 'version': 2}
29{'order_id': 'fxhvd', 'customer_id': 37, 'timestamp': 1724099769, 'version': 2}
30{'order_id': 'lbncv', 'customer_id': 774, 'timestamp': 1724099769, 'version': 2}
31{'order_id': 'ljdrc', 'customer_id': 823, 'timestamp': 1724099769, 'version': 2}
32{'order_id': 'jsuvb', 'customer_id': 943, 'timestamp': 1724099769, 'version': 2}
33{'order_id': 'htuxz', 'customer_id': 287, 'timestamp': 1724099769, 'version': 2}
34INFO:root:refresh customer cache, current tag 1724099770.0, existing tag 1724099765.0...
35{'order_id': 'pxhxp', 'customer_id': 309, 'timestamp': 1724099771, 'version': 3}
36{'order_id': 'qawyw', 'customer_id': 551, 'timestamp': 1724099771, 'version': 3}
37{'order_id': 'obcxg', 'customer_id': 995, 'timestamp': 1724099771, 'version': 3}
38{'order_id': 'ymfbz', 'customer_id': 614, 'timestamp': 1724099771, 'version': 3}
39{'order_id': 'bauwp', 'customer_id': 420, 'timestamp': 1724099771, 'version': 3}
40{'order_id': 'iidmf', 'customer_id': 570, 'timestamp': 1724099773, 'version': 3}
41{'order_id': 'lijfn', 'customer_id': 86, 'timestamp': 1724099773, 'version': 3}
42{'order_id': 'fuqkc', 'customer_id': 206, 'timestamp': 1724099773, 'version': 3}
43{'order_id': 'wywat', 'customer_id': 501, 'timestamp': 1724099773, 'version': 3}
44{'order_id': 'umude', 'customer_id': 986, 'timestamp': 1724099773, 'version': 3}
45INFO:root:refresh customer cache, current tag 1724099775.0, existing tag 1724099770.0...
46{'order_id': 'jssia', 'customer_id': 757, 'timestamp': 1724099775, 'version': 4}
47{'order_id': 'igjsb', 'customer_id': 129, 'timestamp': 1724099775, 'version': 4}
48{'order_id': 'mzhnc', 'customer_id': 589, 'timestamp': 1724099775, 'version': 4}
49{'order_id': 'hgldm', 'customer_id': 22, 'timestamp': 1724099775, 'version': 4}
50{'order_id': 'srpus', 'customer_id': 275, 'timestamp': 1724099775, 'version': 4}
51{'order_id': 'vdxuk', 'customer_id': 985, 'timestamp': 1724099777, 'version': 4}
52{'order_id': 'zykon', 'customer_id': 309, 'timestamp': 1724099777, 'version': 4}
53{'order_id': 'utulz', 'customer_id': 930, 'timestamp': 1724099777, 'version': 4}
54{'order_id': 'dngqv', 'customer_id': 806, 'timestamp': 1724099777, 'version': 4}
55{'order_id': 'bvsmi', 'customer_id': 248, 'timestamp': 1724099777, 'version': 4}
56{'order_id': 'nljnh', 'customer_id': 680, 'timestamp': 1724099779, 'version': 4}
57{'order_id': 'qlkui', 'customer_id': 481, 'timestamp': 1724099779, 'version': 4}
58{'order_id': 'rmaci', 'customer_id': 301, 'timestamp': 1724099779, 'version': 4}
59{'order_id': 'ndlyp', 'customer_id': 351, 'timestamp': 1724099779, 'version': 4}
60{'order_id': 'znvlx', 'customer_id': 546, 'timestamp': 1724099779, 'version': 4}
61INFO:root:refresh customer cache, current tag 1724099780.0, existing tag 1724099775.0...
62{'order_id': 'nyvjt', 'customer_id': 527, 'timestamp': 1724099781, 'version': 5}
63{'order_id': 'wfsls', 'customer_id': 495, 'timestamp': 1724099781, 'version': 5}
64{'order_id': 'pkbuq', 'customer_id': 315, 'timestamp': 1724099781, 'version': 5}
65{'order_id': 'gpmqi', 'customer_id': 9, 'timestamp': 1724099781, 'version': 5}
66{'order_id': 'sqone', 'customer_id': 943, 'timestamp': 1724099781, 'version': 5}
67{'order_id': 'lbvyd', 'customer_id': 311, 'timestamp': 1724099783, 'version': 5}
68{'order_id': 'gmmnk', 'customer_id': 839, 'timestamp': 1724099783, 'version': 5}
69{'order_id': 'vibyr', 'customer_id': 462, 'timestamp': 1724099783, 'version': 5}
70{'order_id': 'bqlia', 'customer_id': 320, 'timestamp': 1724099783, 'version': 5}
71{'order_id': 'uqywb', 'customer_id': 926, 'timestamp': 1724099783, 'version': 5}
Comments