I recently contributed to Apache Beam by adding a common pipeline pattern - Cache data using a shared object. Both batch and streaming pipelines are introduced, and they utilise the Shared class of the Python SDK to enrich PCollection elements. This pattern can be more memory-efficient than side inputs, simpler than a stateful DoFn, and more performant than calling an external service, because it does not have to access an external service for every element or bundle of elements. In this post, we discuss this pattern in more details with batch and streaming use cases. For the latter, we configure the cache gets refreshed periodically.

Create a cache on a batch pipeline

Two data sets are used in the pipelines: order and customer. The order records include customer IDs that customer attributes are added to by mapping the customer records. In this batch example, the customer cache is loaded as a dictionary in the setup method of the EnrichOrderFn. The cache is used to add customer attributes to the order records. Note that we need to create a wrapper class (WeakRefDict) because the Python dictionary doesn’t support weak references while a Shared object encapsulates a weak reference to a singleton instance of the shared resource.

 1import argparse
 2import random
 3import string
 4import logging
 5import time
 6from datetime import datetime
 7
 8import apache_beam as beam
 9from apache_beam.utils import shared
10from apache_beam.options.pipeline_options import PipelineOptions
11
12
13def gen_customers(version: int, num_cust: int = 1000):
14    d = dict()
15    for r in range(num_cust):
16        d[r] = {"version": version}
17    return d
18
19
20def gen_orders(ts: float, num_ord: int = 5, num_cust: int = 1000):
21    orders = [
22        {
23            "order_id": "".join(random.choices(string.ascii_lowercase, k=5)),
24            "customer_id": random.randrange(1, num_cust),
25            "timestamp": int(ts),
26        }
27        for _ in range(num_ord)
28    ]
29    for o in orders:
30        yield o
31
32
33# The wrapper class is needed for a dictionary, because it does not support weak references.
34class WeakRefDict(dict):
35    pass
36
37
38class EnrichOrderFn(beam.DoFn):
39    def __init__(self, shared_handle):
40        self._version = 1
41        self._customers = {}
42        self._shared_handle = shared_handle
43
44    def setup(self):
45        self._customer_lookup = self._shared_handle.acquire(self.load_customers)
46
47    def load_customers(self):
48        time.sleep(2)
49        self._customers = gen_customers(version=self._version)
50        return WeakRefDict(self._customers)
51
52    def process(self, element):
53        attr = self._customer_lookup.get(element["customer_id"], {})
54        yield {**element, **attr}
55
56
57def run(argv=None):
58    parser = argparse.ArgumentParser(
59        description="Shared class demo with a bounded PCollection"
60    )
61    _, pipeline_args = parser.parse_known_args(argv)
62    pipeline_options = PipelineOptions(pipeline_args)
63
64    with beam.Pipeline(options=pipeline_options) as p:
65        shared_handle = shared.Shared()
66        (
67            p
68            | beam.Create(gen_orders(ts=datetime.now().timestamp()))
69            | beam.ParDo(EnrichOrderFn(shared_handle))
70            | beam.Map(print)
71        )
72
73        logging.getLogger().setLevel(logging.INFO)
74        logging.info("Building pipeline ...")
75
76
77if __name__ == "__main__":
78    run()

When we execute the pipeline, we see that the customer attribute (version) is added to the order records.

1INFO:root:Building pipeline ...
2INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
3{'order_id': 'lgkar', 'customer_id': 175, 'timestamp': 1724099730, 'version': 1}
4{'order_id': 'ylxcf', 'customer_id': 785, 'timestamp': 1724099730, 'version': 1}
5{'order_id': 'ypsof', 'customer_id': 446, 'timestamp': 1724099730, 'version': 1}
6{'order_id': 'aowzu', 'customer_id': 41, 'timestamp': 1724099730, 'version': 1}
7{'order_id': 'bbssb', 'customer_id': 194, 'timestamp': 1724099730, 'version': 1}

Create a cache and update it regularly on a streaming pipeline

Because the customer records are assumed to change over time, you need to refresh it periodically. To reload the shared object, change the tag argument of the acquire method. In this example, the refresh is implemented in the start_bundle method, where it compares the current tag value to the value that is associated with the existing shared object. The set_tag method returns a tag value that is the same within the maximum seconds of staleness. Therefore, if a tag value is greater than the existing tag value, it triggers a refresh of the customer cache.

  1import argparse
  2import random
  3import string
  4import logging
  5import time
  6from datetime import datetime
  7
  8import apache_beam as beam
  9from apache_beam.utils import shared
 10from apache_beam.transforms.periodicsequence import PeriodicImpulse
 11from apache_beam.options.pipeline_options import PipelineOptions
 12
 13
 14def gen_customers(version: int, tag: float, num_cust: int = 1000):
 15    d = dict()
 16    for r in range(num_cust):
 17        d[r] = {"version": version}
 18    d["tag"] = tag
 19    return d
 20
 21
 22def gen_orders(ts: float, num_ord: int = 5, num_cust: int = 1000):
 23    orders = [
 24        {
 25            "order_id": "".join(random.choices(string.ascii_lowercase, k=5)),
 26            "customer_id": random.randrange(1, num_cust),
 27            "timestamp": int(ts),
 28        }
 29        for _ in range(num_ord)
 30    ]
 31    for o in orders:
 32        yield o
 33
 34
 35# wrapper class needed for a dictionary since it does not support weak references
 36class WeakRefDict(dict):
 37    pass
 38
 39
 40class EnrichOrderFn(beam.DoFn):
 41    def __init__(self, shared_handle, max_stale_sec):
 42        self._max_stale_sec = max_stale_sec
 43        self._version = 1
 44        self._customers = {}
 45        self._shared_handle = shared_handle
 46
 47    def setup(self):
 48        self._customer_lookup = self._shared_handle.acquire(
 49            self.load_customers, self.set_tag()
 50        )
 51
 52    def set_tag(self):
 53        current_ts = datetime.now().timestamp()
 54        return current_ts - (current_ts % self._max_stale_sec)
 55
 56    def load_customers(self):
 57        time.sleep(2)
 58        self._customers = gen_customers(version=self._version, tag=self.set_tag())
 59        return WeakRefDict(self._customers)
 60
 61    def start_bundle(self):
 62        if self.set_tag() > self._customers["tag"]:
 63            logging.info(
 64                f"refresh customer cache, current tag {self.set_tag()}, existing tag {self._customers['tag']}..."
 65            )
 66            self._version += 1
 67            self._customer_lookup = self._shared_handle.acquire(
 68                self.load_customers, self.set_tag()
 69            )
 70
 71    def process(self, element):
 72        attr = self._customer_lookup.get(element["customer_id"], {})
 73        yield {**element, **attr}
 74
 75
 76def run(argv=None):
 77    parser = argparse.ArgumentParser(
 78        description="Shared class demo with an unbounded PCollection"
 79    )
 80    parser.add_argument(
 81        "--fire_interval",
 82        "-f",
 83        type=int,
 84        default=2,
 85        help="Interval at which to output elements.",
 86    )
 87    parser.add_argument(
 88        "--max_stale_sec",
 89        "-m",
 90        type=int,
 91        default=5,
 92        help="Maximum second of staleness.",
 93    )
 94    known_args, pipeline_args = parser.parse_known_args(argv)
 95    pipeline_options = PipelineOptions(pipeline_args)
 96
 97    with beam.Pipeline(options=pipeline_options) as p:
 98        shared_handle = shared.Shared()
 99        (
100            p
101            | PeriodicImpulse(
102                fire_interval=known_args.fire_interval, apply_windowing=False
103            )
104            | beam.FlatMap(gen_orders)
105            | beam.ParDo(EnrichOrderFn(shared_handle, known_args.max_stale_sec))
106            | beam.Map(print)
107        )
108
109        logging.getLogger().setLevel(logging.INFO)
110        logging.info("Building pipeline ...")
111
112
113if __name__ == "__main__":
114    run()

The pipeline output shows the order records are enriched with updated customer attributes (version). By default, the pipeline creates 5 order records in every 2 seconds, and the maximum seconds of staleness is 5 seconds. Therefore, we have 2.5 sets of order records in a particular version on average, which causes 10 to 15 records are mapped to a single version.

 1INFO:root:Building pipeline ...
 2INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
 3{'order_id': 'rqicm', 'customer_id': 841, 'timestamp': 1724099759, 'version': 1}
 4{'order_id': 'xjjnj', 'customer_id': 506, 'timestamp': 1724099759, 'version': 1}
 5{'order_id': 'iuiua', 'customer_id': 612, 'timestamp': 1724099759, 'version': 1}
 6{'order_id': 'yiwtq', 'customer_id': 486, 'timestamp': 1724099759, 'version': 1}
 7{'order_id': 'aolsp', 'customer_id': 839, 'timestamp': 1724099759, 'version': 1}
 8{'order_id': 'kkldd', 'customer_id': 223, 'timestamp': 1724099761, 'version': 1}
 9{'order_id': 'pxbyr', 'customer_id': 262, 'timestamp': 1724099761, 'version': 1}
10{'order_id': 'yhsrx', 'customer_id': 899, 'timestamp': 1724099761, 'version': 1}
11{'order_id': 'elcqj', 'customer_id': 726, 'timestamp': 1724099761, 'version': 1}
12{'order_id': 'echap', 'customer_id': 32, 'timestamp': 1724099761, 'version': 1}
13{'order_id': 'swhgo', 'customer_id': 98, 'timestamp': 1724099763, 'version': 1}
14{'order_id': 'flcoq', 'customer_id': 6, 'timestamp': 1724099763, 'version': 1}
15{'order_id': 'erhed', 'customer_id': 116, 'timestamp': 1724099763, 'version': 1}
16{'order_id': 'mcupo', 'customer_id': 589, 'timestamp': 1724099763, 'version': 1}
17{'order_id': 'vbvgu', 'customer_id': 870, 'timestamp': 1724099763, 'version': 1}
18INFO:root:refresh customer cache, current tag 1724099765.0, existing tag 1724099760.0...
19{'order_id': 'solvl', 'customer_id': 186, 'timestamp': 1724099765, 'version': 2}
20{'order_id': 'ingmb', 'customer_id': 483, 'timestamp': 1724099765, 'version': 2}
21{'order_id': 'gckfl', 'customer_id': 194, 'timestamp': 1724099765, 'version': 2}
22{'order_id': 'azatw', 'customer_id': 995, 'timestamp': 1724099765, 'version': 2}
23{'order_id': 'ngivi', 'customer_id': 62, 'timestamp': 1724099765, 'version': 2}
24{'order_id': 'czzdh', 'customer_id': 100, 'timestamp': 1724099767, 'version': 2}
25{'order_id': 'xdadk', 'customer_id': 485, 'timestamp': 1724099767, 'version': 2}
26{'order_id': 'sytfd', 'customer_id': 845, 'timestamp': 1724099767, 'version': 2}
27{'order_id': 'ckkvc', 'customer_id': 278, 'timestamp': 1724099767, 'version': 2}
28{'order_id': 'vchzr', 'customer_id': 535, 'timestamp': 1724099767, 'version': 2}
29{'order_id': 'fxhvd', 'customer_id': 37, 'timestamp': 1724099769, 'version': 2}
30{'order_id': 'lbncv', 'customer_id': 774, 'timestamp': 1724099769, 'version': 2}
31{'order_id': 'ljdrc', 'customer_id': 823, 'timestamp': 1724099769, 'version': 2}
32{'order_id': 'jsuvb', 'customer_id': 943, 'timestamp': 1724099769, 'version': 2}
33{'order_id': 'htuxz', 'customer_id': 287, 'timestamp': 1724099769, 'version': 2}
34INFO:root:refresh customer cache, current tag 1724099770.0, existing tag 1724099765.0...
35{'order_id': 'pxhxp', 'customer_id': 309, 'timestamp': 1724099771, 'version': 3}
36{'order_id': 'qawyw', 'customer_id': 551, 'timestamp': 1724099771, 'version': 3}
37{'order_id': 'obcxg', 'customer_id': 995, 'timestamp': 1724099771, 'version': 3}
38{'order_id': 'ymfbz', 'customer_id': 614, 'timestamp': 1724099771, 'version': 3}
39{'order_id': 'bauwp', 'customer_id': 420, 'timestamp': 1724099771, 'version': 3}
40{'order_id': 'iidmf', 'customer_id': 570, 'timestamp': 1724099773, 'version': 3}
41{'order_id': 'lijfn', 'customer_id': 86, 'timestamp': 1724099773, 'version': 3}
42{'order_id': 'fuqkc', 'customer_id': 206, 'timestamp': 1724099773, 'version': 3}
43{'order_id': 'wywat', 'customer_id': 501, 'timestamp': 1724099773, 'version': 3}
44{'order_id': 'umude', 'customer_id': 986, 'timestamp': 1724099773, 'version': 3}
45INFO:root:refresh customer cache, current tag 1724099775.0, existing tag 1724099770.0...
46{'order_id': 'jssia', 'customer_id': 757, 'timestamp': 1724099775, 'version': 4}
47{'order_id': 'igjsb', 'customer_id': 129, 'timestamp': 1724099775, 'version': 4}
48{'order_id': 'mzhnc', 'customer_id': 589, 'timestamp': 1724099775, 'version': 4}
49{'order_id': 'hgldm', 'customer_id': 22, 'timestamp': 1724099775, 'version': 4}
50{'order_id': 'srpus', 'customer_id': 275, 'timestamp': 1724099775, 'version': 4}
51{'order_id': 'vdxuk', 'customer_id': 985, 'timestamp': 1724099777, 'version': 4}
52{'order_id': 'zykon', 'customer_id': 309, 'timestamp': 1724099777, 'version': 4}
53{'order_id': 'utulz', 'customer_id': 930, 'timestamp': 1724099777, 'version': 4}
54{'order_id': 'dngqv', 'customer_id': 806, 'timestamp': 1724099777, 'version': 4}
55{'order_id': 'bvsmi', 'customer_id': 248, 'timestamp': 1724099777, 'version': 4}
56{'order_id': 'nljnh', 'customer_id': 680, 'timestamp': 1724099779, 'version': 4}
57{'order_id': 'qlkui', 'customer_id': 481, 'timestamp': 1724099779, 'version': 4}
58{'order_id': 'rmaci', 'customer_id': 301, 'timestamp': 1724099779, 'version': 4}
59{'order_id': 'ndlyp', 'customer_id': 351, 'timestamp': 1724099779, 'version': 4}
60{'order_id': 'znvlx', 'customer_id': 546, 'timestamp': 1724099779, 'version': 4}
61INFO:root:refresh customer cache, current tag 1724099780.0, existing tag 1724099775.0...
62{'order_id': 'nyvjt', 'customer_id': 527, 'timestamp': 1724099781, 'version': 5}
63{'order_id': 'wfsls', 'customer_id': 495, 'timestamp': 1724099781, 'version': 5}
64{'order_id': 'pkbuq', 'customer_id': 315, 'timestamp': 1724099781, 'version': 5}
65{'order_id': 'gpmqi', 'customer_id': 9, 'timestamp': 1724099781, 'version': 5}
66{'order_id': 'sqone', 'customer_id': 943, 'timestamp': 1724099781, 'version': 5}
67{'order_id': 'lbvyd', 'customer_id': 311, 'timestamp': 1724099783, 'version': 5}
68{'order_id': 'gmmnk', 'customer_id': 839, 'timestamp': 1724099783, 'version': 5}
69{'order_id': 'vibyr', 'customer_id': 462, 'timestamp': 1724099783, 'version': 5}
70{'order_id': 'bqlia', 'customer_id': 320, 'timestamp': 1724099783, 'version': 5}
71{'order_id': 'uqywb', 'customer_id': 926, 'timestamp': 1724099783, 'version': 5}