Skip to content

Historical Data & Forecasting

While dynamic-des is designed for real-time digital twins, it is equally powerful as a synchronized forecasting engine. By manipulating the environment's time factor and initial state, you can run simulations to generate vast amounts of historical data or instantly predict future states.

This is particularly useful for generating synthetic datasets to train Machine Learning models, backfilling Data Lakes, or simulating 8 hours into the future to see if your current production schedule will cause a bottleneck.

Key Concepts

  • Time Dilation (factor=0.0): Setting the environment's factor to 0 tells the engine to execute events as fast as the CPU allows, completely detaching logical simulation time from real-world wall-clock time.
  • State Snapshots: To forecast, you can query your live database or Kafka cluster for the current system state, feed it into the SimParameter registry, and start a fast-forwarded run from that precise moment.
  • Storage Egress & Cloud Object Storage: Instead of streaming to a live message broker like Kafka, batch runs typically write directly to object storage (AWS S3, Google Cloud Storage, Local) using the ParquetStorageEgress or JsonlStorageEgress connectors powered by PyArrow VFS.
  • Dynamic Routing (Multiplexing): You can provide a path_router function to dynamically split, route, or drop records on the fly before they hit storage.

Execution

This example requires the parquet optional dependency to enable PyArrow's high-performance columnar writing capabilities. You can run the built-in demo directly from your terminal:

# Install with parquet support
pip install dynamic-des[parquet]

# Run the example (writes locally by default)
ddes-history-example

# Run the example configured for S3
USE_S3=true S3_ENDPOINT="localhost:8333" S3_ACCESS_KEY="user" S3_SECRET_KEY="password" ddes-history-example

Code

This script simulates a manufacturing line over a 1-week period. It demonstrates how to route lifecycle events to one Parquet dataset directly on AWS S3, drop meaningless real-time metrics (like system lag), and format the data perfectly for a Data Lake.

import logging
import os
from datetime import datetime, timedelta

import numpy as np

from dynamic_des import (
    CapacityConfig,
    DistributionConfig,
    DynamicRealtimeEnvironment,
    DynamicResource,
    ParquetStorageEgress,
    Sampler,
    SimParameter,
)
from dynamic_des.utils import time_to_seconds

logging.basicConfig(
    level=logging.INFO, format="%(levelname)s [%(asctime)s] %(message)s"
)
logger = logging.getLogger("history_example")


def create_history_router(base_path: str):
    """
    Router Factory: Generates a router function injected with the correct
    base path, and flattens nested event payloads for Parquet.
    """

    def history_router(data: dict) -> str | None:
        if data.get("path_id") == "system.simulation.lag_seconds":
            return None

        stream_type = data.get("stream_type")

        if stream_type == "telemetry":
            return None

        # FLATTEN EVENT FOR PARQUET
        # If it's an event and has a nested 'value' dictionary, flatten it
        if (
            stream_type == "event"
            and "value" in data
            and isinstance(data["value"], dict)
        ):
            # Extract and remove the nested 'value' object
            nested_value = data.pop("value")
            # Merge the nested keys (path_id, status) directly into the root dict
            data.update(nested_value)

        return f"{base_path}/events.parquet"

    return history_router


def run():
    # ---------------------------------------------------------
    # 1. DUAL-MODE STORAGE CONFIGURATION
    # ---------------------------------------------------------
    use_s3 = os.getenv("USE_S3", "false").lower() == "true"
    base_path = os.getenv("DEST_PATH", "dml-dev/history" if use_s3 else "data")
    filesystem = None

    if use_s3:
        # Lazy import PyArrow so the script doesn't crash if running purely local
        # without the [parquet] extra installed (though it is needed for ParquetEgress)
        from pyarrow import fs

        logger.info(f"Configuring S3 Egress. Target Bucket: '{base_path}'")
        filesystem = fs.S3FileSystem(
            access_key=os.getenv("S3_ACCESS_KEY", "user"),
            secret_key=os.getenv("S3_SECRET_KEY", "password"),
            endpoint_override=os.getenv("S3_ENDPOINT", "localhost:8333"),
            scheme="http",
        )
        # Ensure S3 Bucket exists
        filesystem.create_dir(base_path)
    else:
        logger.info(f"Configuring Local Egress. Target Folder: '{base_path}'")
        # Ensure local directory exists
        os.makedirs(base_path, exist_ok=True)

    # ---------------------------------------------------------
    # 2. SIMULATION SETUP
    # ---------------------------------------------------------
    line_a_params = SimParameter(
        sim_id="Line_A",
        arrival={"standard": DistributionConfig(dist="exponential", rate=2.0)},
        service={"milling": DistributionConfig(dist="normal", mean=2.0, std=0.2)},
        resources={"lathe": CapacityConfig(current_cap=4, max_cap=10)},
    )

    start_time = datetime.now() - timedelta(days=7)
    env = DynamicRealtimeEnvironment(factor=0.0, logical_start_time=start_time)
    env.registry.register_sim_parameter(line_a_params)

    # Initialize Egress with the dynamic router and conditional filesystem
    router = create_history_router(base_path)
    egress = ParquetStorageEgress(path_router=router, filesystem=filesystem)

    # batch_size=5000 for compression, flush_interval=86400 (1 day in sim time)
    env.setup_egress([egress], batch_size=5000, flush_interval=86400)

    res = DynamicResource(env, "Line_A", "lathe")
    sampler = Sampler(rng=np.random.default_rng(42))

    def arrival_process(env: DynamicRealtimeEnvironment, res: DynamicResource):
        arrival_cfg = env.registry.get_config("Line_A.arrival.standard")
        service_path = "Line_A.service.milling"
        task_id = 0

        while True:
            yield env.timeout(sampler.sample(arrival_cfg))
            env.process(work_task(env, task_id, res, service_path))
            task_id += 1

    def work_task(
        env: DynamicRealtimeEnvironment,
        task_id: int,
        res: DynamicResource,
        path_id: str,
    ):
        task_key = f"task-{task_id}"
        env.publish_event(task_key, {"path_id": path_id, "status": "queued"})

        with res.request() as req:
            yield req
            current_service_cfg = env.registry.get_config(path_id)
            env.publish_event(task_key, {"path_id": path_id, "status": "started"})
            yield env.timeout(sampler.sample(current_service_cfg))
            env.publish_event(task_key, {"path_id": path_id, "status": "finished"})

    def telemetry_monitor(env: DynamicRealtimeEnvironment, res: DynamicResource):
        while True:
            env.publish_telemetry("Line_A.lathe.capacity", res.capacity)
            env.publish_telemetry("Line_A.lathe.in_use", res.in_use)
            env.publish_telemetry("Line_A.lathe.queue_length", len(res.queue.items))

            util = (res.in_use / res.capacity) * 100 if res.capacity > 0 else 0
            env.publish_telemetry("Line_A.lathe.utilization", util)

            yield env.timeout(60.0)

    env.process(arrival_process(env, res))
    env.process(telemetry_monitor(env, res))

    run_duration_str = "1 week"
    run_duration_sec = time_to_seconds(run_duration_str)

    logger.info(
        f"Generating historical data from {start_time.strftime('%Y-%m-%d %H:%M:%S')}"
    )
    logger.info("Fast-forwarding (factor=0.0)...")

    try:
        env.run(until=run_duration_sec)
    finally:
        env.teardown()
        logger.info(f"Data generation complete. Check '{base_path}/' for chunks.")


if __name__ == "__main__":
    run()