A Splittable DoFn (SDF) is a generalization of a DoFn that enables Apache Beam developers to create modular and composable I/O components. Also, it can be applied in advanced non-I/O scenarios such as Monte Carlo simulation. In this post, we develop two Apache Beam pipelines. The first pipeline is an I/O connector, and it reads a list of files in a folder followed by processing each of the file objects in parallel. The second pipeline estimates the value of $\pi$ by performing Monte Carlo simulation.

Splittable DoFn

A Splittable DoFn (SDF) is a generalization of a DoFn that enables Apache Beam developers to create modular and composable I/O components. Also, it can be applied in advanced non-I/O scenarios such as Monte Carlo simulation.

As described in the Apache Beam Programming Guide, an SDF is responsible for processing element and restriction pairs. A restriction represents a subset of work that would have been necessary to have been done when processing the element.

Executing an SDF follows the following steps:

  1. Each element is paired with a restriction (e.g. filename is paired with offset range representing the whole file).
  2. Each element and restriction pair is split (e.g. offset ranges are broken up into smaller pieces).
  3. The runner redistributes the element and restriction pairs to several workers.
  4. Element and restriction pairs are processed in parallel (e.g. the file is read). Within this last step, the element and restriction pair can pause its own processing and/or be split into further element and restriction pairs.

A basic SDF is composed of three parts: a restriction, a restriction provider, and a restriction tracker.

  • restriction
    • It represents a subset of work for a given element.
    • No specific class needs to be implemented to represent a restriction.
  • restriction provider
    • It lets developers override default implementations used to generate and manipulate restrictions.
    • It extends from the RestrictionProvider base class.
  • restriction tracker
    • It tracks for which parts of the restriction processing has been completed.
    • It extends from the RestrictionTracker base class.

The Python SDK has a built-in restriction (OffsetRange) and restriction tracker (OffsetRangeTracker), and we will use them in this post.

An advanced SDF has the following components for watermark estimation, and we will use them in the next post.

  • watermark state
    • It is a user-defined object. In its simplest form it could just be a timestamp.
  • watermark estimator
    • It tracks the watermark state when the processing of an element and restriction pair is in progress.
    • It extends from the WatermarkEstimator base class.
  • watermark estimator provider
    • It lets developers define how to initialize the watermark state and create a watermark estimator.
    • It extends from the WatermarkEstimatorProvider base class.

For more details, visit Splittable DoFns in Python: a hands-on workshop.

Batch File Reader

This Beam pipeline reads a list of files in a folder followed by processing each of the file objects in parallel. The pipeline is slightly modified from an example which was introduced in a workshop at Beam Summit 2022. The source of this post can be found in this GitHub repository.

File Generator

The batch pipeline requires input files to process, and those files can be created using the file generator (faker_file_gen.py). It creates a configurable number of files (default 10) in a specified folder (default fake_files). Each file includes zero to two hundreds lines of text, which is generated by the Faker package.

 1# utils/faker_file_gen.py
 2import os
 3import time
 4import shutil
 5import argparse
 6import logging
 7
 8from faker import Faker
 9
10
11def create_folder(file_path: str):
12    shutil.rmtree(file_path, ignore_errors=True)
13    os.mkdir(file_path)
14
15
16def write_to_file(fake: Faker, file_path: str, file_name: str):
17    with open(os.path.join(file_path, file_name), "w") as f:
18        f.writelines(fake.texts(nb_texts=fake.random_int(min=0, max=200)))
19
20
21if __name__ == "__main__":
22    parser = argparse.ArgumentParser(__file__, description="Fake Text File Generator")
23    parser.add_argument(
24        "-p",
25        "--file_path",
26        default=os.path.join(
27            os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "fake_files"
28        ),
29        help="File path",
30    )
31    parser.add_argument(
32        "-m",
33        "--max_files",
34        type=int,
35        default=10,
36        help="The amount of time that a record should be delayed.",
37    )
38    parser.add_argument(
39        "-d",
40        "--delay_seconds",
41        type=float,
42        default=0.5,
43        help="The amount of time that a record should be delayed.",
44    )
45
46    args = parser.parse_args()
47
48    logging.getLogger().setLevel(logging.INFO)
49    logging.info(
50        f"Create files: max files {args.max_files}, delay seconds {args.delay_seconds}..."
51    )
52
53    fake = Faker()
54    Faker.seed(1237)
55
56    create_folder(args.file_path)
57    current = 0
58    while True:
59        write_to_file(
60            fake,
61            args.file_path,
62            f"{''.join(fake.random_letters(length=10)).lower()}.txt",
63        )
64        current += 1
65        if current % 5 == 0:
66            logging.info(f"Created {current} files so far...")
67        if current == args.max_files:
68            break
69        time.sleep(args.delay_seconds)

Once executed, we can check input files are created in a specified folder.

 1$ python utils/faker_file_gen.py --file_path fake_files --max_files 10 --delay_seconds 0.5
 2INFO:root:Create files: max files 10, delay seconds 0.5...
 3INFO:root:Created 5 files so far...
 4INFO:root:Created 10 files so far...
 5
 6$ tree fake_files/
 7fake_files/
 8├── dhmfhxwbqd.txt
 9├── dzpjwroqdd.txt
10├── humlastzob.txt
11├── jfdlxdnsre.txt
12├── qgtyykdzdk.txt
13├── uxgrwaisgj.txt
14├── vcqlxrcqnx.txt
15├── ydlceysnkc.txt
16├── ylgvjqjhrb.txt
17└── zpjinqrdyw.txt
18
191 directory, 10 files

SDF for File Processing

The pipeline has two DoFn objects.

  • GenerateFilesFn
    • It reads each of the files in the input folder (element) followed by yielding a custom object (MyFile). The custom object keeps details of a file object that include the file name and start/end positions. As we use the default OffsetRestrictionTracker in the subsequent DoFn, it is necessary to keep the start and end positions because those are used to set up the initial offset range.
  • ProcessFilesFn
    • It begins with recovering the current restriction using the parameter of type RestrictionParam, which is passed as argument. Then, it tries to claim/lock the position. Once claimed, it proceeds to processing the associated element and restriction pair, which is just yielding a text that keeps the file name and current position.
 1# chapter7/file_read.py
 2import os
 3import typing
 4import logging
 5
 6import apache_beam as beam
 7from apache_beam import RestrictionProvider
 8from apache_beam.io.restriction_trackers import OffsetRange, OffsetRestrictionTracker
 9
10
11class MyFile(typing.NamedTuple):
12    name: str
13    start: int
14    end: int
15
16
17class GenerateFilesFn(beam.DoFn):
18    def process(self, element: str) -> typing.Iterable[MyFile]:
19        for file in os.listdir(element):
20            if os.path.isfile(os.path.join(element, file)):
21                num_lines = sum(1 for _ in open(os.path.join(element, file)))
22                new_file = MyFile(file, 0, num_lines)
23                yield new_file
24
25
26class ProcessFilesFn(beam.DoFn, RestrictionProvider):
27    def process(
28        self,
29        element: MyFile,
30        tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(),
31    ):
32        restriction = tracker.current_restriction()
33        for current_position in range(restriction.start, restriction.stop + 1):
34            if tracker.try_claim(current_position):
35                m = f"file: {element.name}, position: {current_position}"
36                logging.info(m)
37                yield m
38            else:
39                return
40
41    def create_tracker(self, restriction: OffsetRange) -> OffsetRestrictionTracker:
42        return OffsetRestrictionTracker(restriction)
43
44    def initial_restriction(self, element: MyFile) -> OffsetRange:
45        return OffsetRange(start=element.start, stop=element.end)
46
47    def restriction_size(self, element: MyFile, restriction: OffsetRange) -> int:
48        return restriction.size()

Beam Pipeline

The pipeline begins with passing the input folder name to the GenerateFilesFn DoFn. Then, the DoFn yields the custom MyFile objects while reading the whole list of files in the input folder. Finally, the custom objects are processed by the ProcessFilesFn DoFn.

 1# chapter7/batch_file_read.py
 2import os
 3import logging
 4import argparse
 5
 6import apache_beam as beam
 7from apache_beam.options.pipeline_options import PipelineOptions
 8from apache_beam.options.pipeline_options import SetupOptions
 9
10from file_read import GenerateFilesFn, ProcessFilesFn
11
12
13def run(argv=None, save_main_session=True):
14    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
15    parser.add_argument(
16        "-p",
17        "--file_path",
18        default=os.path.join(
19            os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "fake_files"
20        ),
21        help="File path",
22    )
23
24    known_args, pipeline_args = parser.parse_known_args(argv)
25
26    # # We use the save_main_session option because one or more DoFn's in this
27    # # workflow rely on global context (e.g., a module imported at module level).
28    pipeline_options = PipelineOptions(pipeline_args)
29    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
30    print(f"known args - {known_args}")
31    print(f"pipeline options - {pipeline_options.display_data()}")
32
33    with beam.Pipeline(options=pipeline_options) as p:
34        (
35            p
36            | beam.Create([known_args.file_path])
37            | beam.ParDo(GenerateFilesFn())
38            | beam.ParDo(ProcessFilesFn())
39        )
40
41        logging.getLogger().setLevel(logging.INFO)
42        logging.info("Building pipeline ...")
43
44
45if __name__ == "__main__":
46    run()

We can create input files and run the pipeline using the Direct Runner as shown below.

1python utils/faker_file_gen.py -m 1000 -d 0
2python chapter7/batch_file_read.py \
3    --direct_num_workers=3 --direct_running_mode=multi_threading

PI Sampler

This pipeline performs a Monte Carlo simulation to estimate $\pi$. Let say we have a square that is centered at the origin of a two-dimensional space. If the length of its side is two, its area becomes four. Now we draw a circle of radius one, centered at the origin as well. Then, its area becomes $\pi$. In this circumstance, if we randomly place a dot within the square, the probability that we place the dot within the circle is $\pi$/4. To estimate this probability empirically, we can place many dots in the square. If we label the case as positive when a dot is placed within the circle and as negative otherwise, we have the following empirical relationship.

  • $\pi$/4 $\approx$ # positive/# total

Therefore, we can estimate the value of $\pi$ with one of the following formulas.

  • $\pi$ $\approx$ 4 x # positive/# total or 4 x (1 - # negative/# total)

The pipeline has two DoFn objects as shown below.

  • GenerateExperiments
    • It yields a fixed number of samples (num_samples) to simulate in multiple experiments (parallelism). Therefore, it ends up creating num_samples * parallelism simulations in total.
  • PiSamplerDoFn
    • Similar to the earlier pipeline, it begins with recovering the current restriction using the parameter of type RestrictionParam, which is passed as argument. Then, it tries to claim/lock the position. Once claimed, it generates two random numbers between zero and one followed by yielding one if their squired sum is greater than one. i.e. It tracks negative cases.

After completing all simulations, the resulting values are added, which calculates the number of all negative cases. Finally, the value of $\pi$ is estimated by applying the second formula mentioned above.

 1import random
 2import logging
 3import argparse
 4
 5import apache_beam as beam
 6from apache_beam import RestrictionProvider
 7from apache_beam.io.restriction_trackers import OffsetRange, OffsetRestrictionTracker
 8from apache_beam.options.pipeline_options import PipelineOptions
 9from apache_beam.options.pipeline_options import SetupOptions
10
11
12class GenerateExperiments(beam.DoFn):
13    def __init__(self, parallelism: int, num_samples: int):
14        self.parallelism = parallelism
15        self.num_samples = num_samples
16
17    def process(self, ignored_element):
18        for i in range(self.parallelism):
19            if (i + 1) % 10 == 0:
20                logging.info(f"sending {i + 1}th experiment")
21            yield self.num_samples
22
23
24class PiSamplerDoFn(beam.DoFn, RestrictionProvider):
25    def process(
26        self,
27        element: int,
28        tracker: OffsetRestrictionTracker = beam.DoFn.RestrictionParam(),
29    ):
30        restriction = tracker.current_restriction()
31        for current_position in range(restriction.start, restriction.stop + 1):
32            if tracker.try_claim(current_position):
33                x, y = random.random(), random.random()
34                if x * x + y * y > 1:
35                    yield 1
36            else:
37                return
38
39    def create_tracker(self, restriction: OffsetRange) -> OffsetRestrictionTracker:
40        return OffsetRestrictionTracker(restriction)
41
42    def initial_restriction(self, element: int) -> OffsetRange:
43        return OffsetRange(start=0, stop=element)
44
45    def restriction_size(self, element: int, restriction: OffsetRange) -> int:
46        return restriction.size()
47
48
49def run(argv=None, save_main_session=True):
50    parser = argparse.ArgumentParser(description="Beam pipeline arguments")
51    parser.add_argument(
52        "-p", "--parallelism", type=int, default=100, help="Number of parallelism"
53    )
54    parser.add_argument(
55        "-n", "--num_samples", type=int, default=10000, help="Number of samples"
56    )
57
58    known_args, pipeline_args = parser.parse_known_args(argv)
59
60    # # We use the save_main_session option because one or more DoFn's in this
61    # # workflow rely on global context (e.g., a module imported at module level).
62    pipeline_options = PipelineOptions(pipeline_args)
63    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
64    print(f"known args - {known_args}")
65    print(f"pipeline options - {pipeline_options.display_data()}")
66
67    with beam.Pipeline(options=pipeline_options) as p:
68        (
69            p
70            | beam.Create([0])
71            | beam.ParDo(
72                GenerateExperiments(
73                    parallelism=known_args.parallelism,
74                    num_samples=known_args.num_samples,
75                )
76            )
77            | beam.ParDo(PiSamplerDoFn())
78            | beam.CombineGlobally(sum)
79            | beam.Map(
80                lambda e: 4
81                * (1 - e / (known_args.num_samples * known_args.parallelism))
82            )
83            | beam.Map(print)
84        )
85
86        logging.getLogger().setLevel(logging.INFO)
87        logging.info("Building pipeline ...")
88
89
90if __name__ == "__main__":
91    run()

In the following example, the value of $\pi$ is estimated by 6,000,000 simulations (parallelism: 200, num_samples: 30,000). We see that the pipeline estimates $\pi$ correctly to three decimal places.

1python chapter7/pi_sampler.py -p 200 -n 30000 \
2    --direct_num_workers=3 --direct_running_mode=multi_threading