Skip to content

Local Simulation

This example demonstrates how to build a dynamic simulation using Local Connectors.

Local connectors do not require Docker, Kafka, or any external data stores. They are perfect for testing, benchmarking, or scenarios where parameter changes need to occur at specific wall-clock intervals deterministically.

Execution

Local connectors are dependency-free and do not require Docker or Kafka. You can run the built-in demo directly from your terminal:

# Core library is enough for local examples
pip install dynamic-des

# Run the example
ddes-local-example

Code

This script initializes a production line, schedules a capacity update to happen 10 seconds into the future, and streams telemetry directly to your terminal.

import logging
import numpy as np

from dynamic_des import (
    CapacityConfig,
    ConsoleEgress,
    DistributionConfig,
    DynamicRealtimeEnvironment,
    DynamicResource,
    LocalIngress,
    Sampler,
    SimParameter,
)

logging.basicConfig(level=logging.INFO, format="%(levelname)s [%(asctime)s] %(message)s")

def run():
    # 1. Define the initial system schema
    line_a_params = SimParameter(
        sim_id="Line_A",
        arrival={"standard": DistributionConfig(dist="exponential", rate=1.0)},
        service={"milling": DistributionConfig(dist="normal", mean=3.0, std=0.5)},
        resources={"lathe": CapacityConfig(current_cap=1, max_cap=5)},
    )

    # 2. Setup Environment with Local Connectors
    # Schedule capacity to jump to 3 at t=10s, then drop to 2 at t=20s
    ingress = LocalIngress(
        schedule=[
            (10.0, "Line_A.resources.lathe.current_cap", 3),
            (20.0, "Line_A.resources.lathe.current_cap", 2),
        ]
    )
    # Egress simply prints to the console
    egress = ConsoleEgress()

    env = DynamicRealtimeEnvironment(factor=1.0)
    env.registry.register_sim_parameter(line_a_params)
    env.setup_ingress([ingress])
    env.setup_egress([egress])

    # 3. Initialize Resources and Sampler
    res = DynamicResource(env, "Line_A", "lathe")
    sampler = Sampler(rng=np.random.default_rng(42))

    # 4. Define Simulation Logic
    def arrival_process(env, res):
        arrival_cfg = env.registry.get_config("Line_A.arrival.standard")
        task_id = 0
        while True:
            yield env.timeout(sampler.sample(arrival_cfg))
            env.process(work_task(env, task_id, res, "Line_A.service.milling"))
            task_id += 1

    def work_task(env, task_id, res, path_id):
        task_key = f"task-{task_id}"
        env.publish_event(task_key, {"path_id": path_id, "status": "queued"})

        with res.request() as req:
            yield req
            current_service_cfg = env.registry.get_config(path_id)
            env.publish_event(task_key, {"path_id": path_id, "status": "started"})
            yield env.timeout(sampler.sample(current_service_cfg))
            env.publish_event(task_key, {"path_id": path_id, "status": "finished"})

    def telemetry_monitor(env, res):
        """Streams system health metrics every 2 seconds."""
        while True:
            env.publish_telemetry("Line_A.lathe.capacity", res.capacity)
            env.publish_telemetry("Line_A.lathe.in_use", res.in_use)
            env.publish_telemetry("Line_A.lathe.queue_length", len(res.queue.items))
            yield env.timeout(2.0)

    # 5. Run the Simulation
    env.process(arrival_process(env, res))
    env.process(telemetry_monitor(env, res))

    print("Simulation started. Watch capacity change at t=10.0s and 20.0s...")
    try:
        env.run(until=30)
    finally:
        env.teardown()

if __name__ == "__main__":
    run()