Skip to content

API Reference

Environment

DynamicRealtimeEnvironment

Bases: RealtimeEnvironment, RegistryMixIn, IngressMixIn, EgressMixIn

The core simulation engine for dynamic-des.

This environment extends SimPy's RealtimeEnvironment by incorporating a centralized SimulationRegistry for dynamic state updates, and MixIns for managing high-throughput asynchronous I/O with external systems.

Attributes:

Name Type Description
start_datetime datetime

The real-world clock time when the simulation started.

Source code in src/dynamic_des/core/environment.py
class DynamicRealtimeEnvironment(
    RealtimeEnvironment, RegistryMixIn, IngressMixIn, EgressMixIn
):
    """
    The core simulation engine for `dynamic-des`.

    This environment extends SimPy's `RealtimeEnvironment` by incorporating
    a centralized `SimulationRegistry` for dynamic state updates, and MixIns
    for managing high-throughput asynchronous I/O with external systems.

    Attributes:
        start_datetime (datetime): The real-world clock time when the simulation started.
    """

    def __init__(self, initial_time=0, factor=1.0, strict=False):
        """
        Initializes the real-time simulation environment.

        Args:
            initial_time (float, optional): The initial simulation time. Defaults to 0.
            factor (float, optional): The real-time factor (e.g., 1.0 = 1 sim second per real second). Defaults to 1.0.
            strict (bool, optional): If True, raises RuntimeError if simulation falls too far behind real time. Defaults to False.
        """
        self.start_datetime = datetime.now()
        super().__init__(initial_time=initial_time, factor=factor, strict=strict)
        self.setup_registry()

    def teardown(self):
        """
        Gracefully terminates the environment.

        Ensures that any remaining data in event buffers is flushed to the
        egress connectors, and that background asyncio threads for both
        ingress and egress are cleanly stopped. Should be called in a `finally` block.
        """
        logger.info("Environment teardown initiated.")
        if hasattr(self, "teardown_egress"):
            self.teardown_egress()
        if hasattr(self, "teardown_ingress"):
            self.teardown_ingress()
        logger.info("Environment teardown complete.")

Registry & Parameters

SimulationRegistry

A centralized 'Switchboard' that maps dot-notation paths to dynamic simulation parameters.

The Registry acts as the single source of truth for the simulation's state. It allows external streams (like Kafka or Redis) to update parameters on the fly, seamlessly synchronizing them with the underlying SimPy processes.

Attributes:

Name Type Description
env Environment

The active SimPy environment.

Source code in src/dynamic_des/core/registry.py
class SimulationRegistry:
    """
    A centralized 'Switchboard' that maps dot-notation paths to dynamic simulation parameters.

    The Registry acts as the single source of truth for the simulation's state. It allows
    external streams (like Kafka or Redis) to update parameters on the fly, seamlessly
    synchronizing them with the underlying SimPy processes.

    Attributes:
        env (Environment): The active SimPy environment.
    """

    def __init__(self, env: Environment):
        self.env = env
        self._values: Dict[str, DynamicValue] = {}
        self._configs: Dict[str, Any] = {}

    def get(self, path: str) -> DynamicValue:
        """
        Retrieve the `DynamicValue` object at a specific path.

        Args:
            path (str): The dot-notation path (e.g., 'Line_A.arrival.standard.rate').

        Returns:
            DynamicValue: The wrapper object for the requested parameter.

        Raises:
            KeyError: If the path does not exist in the registry.
        """
        if path not in self._values:
            raise KeyError(f"Path '{path}' not found in Simulation Registry.")
        return self._values[path]

    def get_config(self, path: str) -> Any:
        """
        Retrieve the live configuration object (e.g., `DistributionConfig`).

        Args:
            path (str): The dot-notation path (e.g., 'Line_A.service.milling').

        Returns:
            Any: The synchronized configuration object.

        Raises:
            KeyError: If the config path does not exist in the registry.
        """
        if path not in self._configs:
            raise KeyError(f"Config path '{path}' not found in Simulation Registry.")
        return self._configs[path]

    def update(self, path: str, new_value: Any):
        """
        Update a value safely and synchronize its parent configuration object.

        Includes dynamic type casting to protect the simulation from crashing if
        external systems send improperly typed data (e.g., sending the string "5"
        instead of the integer 5).

        Args:
            path (str): The dot-notation path to update.
            new_value (Any): The new value to apply.
        """
        if path in self._values:
            current_val = self._values[path].value
            validated_value = new_value

            # Type Validation & Casting
            if current_val is not None:
                expected_type = type(current_val)
                # If types don't match, attempt a safe cast (e.g., "5" -> 5)
                if not isinstance(new_value, expected_type):
                    try:
                        validated_value = expected_type(new_value)
                        logger.debug(
                            f"Cast '{new_value}' to {expected_type.__name__} for '{path}'"
                        )
                    except (ValueError, TypeError):
                        logger.error(
                            f"Type mismatch for '{path}'. Expected {expected_type.__name__}, "
                            f"got {type(new_value).__name__} with value '{new_value}'. Update ignored."
                        )
                        return  # Exit early to prevent crashing the simulation

            # Update the DynamicValue (triggers SimPy events)
            self._values[path].update(validated_value)

            # Synchronize the attribute on the original config object
            if "." in path:
                parent_path, attr = path.rsplit(".", 1)
                if parent_path in self._configs:
                    setattr(self._configs[parent_path], attr, validated_value)
        else:
            logger.warning(f"Attempted to update non-existent path: {path}")

    def register_sim_parameter(self, param: SimParameter):
        """
        Takes a `SimParameter` instance and flattens it into the registry.

        Args:
            param (SimParameter): The initial state schema to register.
        """
        prefix = param.sim_id

        # Register Arrivals
        for name, dist_config in param.arrival.items():
            path = f"{prefix}.arrival.{name}"
            self._configs[path] = dist_config
            self._register_dist(path, dist_config)

        # Register Service Steps
        for name, dist_config in param.service.items():
            path = f"{prefix}.service.{name}"
            self._configs[path] = dist_config
            self._register_dist(path, dist_config)

        # Register Resources
        for name, cap_config in param.resources.items():
            path = f"{prefix}.resources.{name}"
            self._configs[path] = cap_config
            self._register_cap(path, cap_config)

        # Register Containers
        for name, cap_config in param.containers.items():
            path = f"{prefix}.containers.{name}"
            self._configs[path] = cap_config
            self._register_cap(path, cap_config)

        # Register Stores
        for name, cap_config in param.stores.items():
            path = f"{prefix}.stores.{name}"
            self._configs[path] = cap_config
            self._register_cap(path, cap_config)

    def _register_dist(self, path_prefix: str, dist_config: Any):
        """Internal: Flattens a DistributionConfig."""
        if dist_config.dist == "exponential":
            self._values[f"{path_prefix}.rate"] = DynamicValue(
                self.env, f"{path_prefix}.rate", dist_config.rate
            )
        else:
            self._values[f"{path_prefix}.mean"] = DynamicValue(
                self.env, f"{path_prefix}.mean", dist_config.mean
            )
            self._values[f"{path_prefix}.std"] = DynamicValue(
                self.env, f"{path_prefix}.std", dist_config.std
            )

    def _register_cap(self, path_prefix: str, cap_config: Any):
        """Internal: Flattens a CapacityConfig."""
        self._values[f"{path_prefix}.current_cap"] = DynamicValue(
            self.env, f"{path_prefix}.current_cap", cap_config.current_cap
        )
        self._values[f"{path_prefix}.max_cap"] = DynamicValue(
            self.env, f"{path_prefix}.max_cap", cap_config.max_cap
        )

Functions

get(path)

Retrieve the DynamicValue object at a specific path.

Parameters:

Name Type Description Default
path str

The dot-notation path (e.g., 'Line_A.arrival.standard.rate').

required

Returns:

Name Type Description
DynamicValue DynamicValue

The wrapper object for the requested parameter.

Raises:

Type Description
KeyError

If the path does not exist in the registry.

Source code in src/dynamic_des/core/registry.py
def get(self, path: str) -> DynamicValue:
    """
    Retrieve the `DynamicValue` object at a specific path.

    Args:
        path (str): The dot-notation path (e.g., 'Line_A.arrival.standard.rate').

    Returns:
        DynamicValue: The wrapper object for the requested parameter.

    Raises:
        KeyError: If the path does not exist in the registry.
    """
    if path not in self._values:
        raise KeyError(f"Path '{path}' not found in Simulation Registry.")
    return self._values[path]
get_config(path)

Retrieve the live configuration object (e.g., DistributionConfig).

Parameters:

Name Type Description Default
path str

The dot-notation path (e.g., 'Line_A.service.milling').

required

Returns:

Name Type Description
Any Any

The synchronized configuration object.

Raises:

Type Description
KeyError

If the config path does not exist in the registry.

Source code in src/dynamic_des/core/registry.py
def get_config(self, path: str) -> Any:
    """
    Retrieve the live configuration object (e.g., `DistributionConfig`).

    Args:
        path (str): The dot-notation path (e.g., 'Line_A.service.milling').

    Returns:
        Any: The synchronized configuration object.

    Raises:
        KeyError: If the config path does not exist in the registry.
    """
    if path not in self._configs:
        raise KeyError(f"Config path '{path}' not found in Simulation Registry.")
    return self._configs[path]
register_sim_parameter(param)

Takes a SimParameter instance and flattens it into the registry.

Parameters:

Name Type Description Default
param SimParameter

The initial state schema to register.

required
Source code in src/dynamic_des/core/registry.py
def register_sim_parameter(self, param: SimParameter):
    """
    Takes a `SimParameter` instance and flattens it into the registry.

    Args:
        param (SimParameter): The initial state schema to register.
    """
    prefix = param.sim_id

    # Register Arrivals
    for name, dist_config in param.arrival.items():
        path = f"{prefix}.arrival.{name}"
        self._configs[path] = dist_config
        self._register_dist(path, dist_config)

    # Register Service Steps
    for name, dist_config in param.service.items():
        path = f"{prefix}.service.{name}"
        self._configs[path] = dist_config
        self._register_dist(path, dist_config)

    # Register Resources
    for name, cap_config in param.resources.items():
        path = f"{prefix}.resources.{name}"
        self._configs[path] = cap_config
        self._register_cap(path, cap_config)

    # Register Containers
    for name, cap_config in param.containers.items():
        path = f"{prefix}.containers.{name}"
        self._configs[path] = cap_config
        self._register_cap(path, cap_config)

    # Register Stores
    for name, cap_config in param.stores.items():
        path = f"{prefix}.stores.{name}"
        self._configs[path] = cap_config
        self._register_cap(path, cap_config)
update(path, new_value)

Update a value safely and synchronize its parent configuration object.

Includes dynamic type casting to protect the simulation from crashing if external systems send improperly typed data (e.g., sending the string "5" instead of the integer 5).

Parameters:

Name Type Description Default
path str

The dot-notation path to update.

required
new_value Any

The new value to apply.

required
Source code in src/dynamic_des/core/registry.py
def update(self, path: str, new_value: Any):
    """
    Update a value safely and synchronize its parent configuration object.

    Includes dynamic type casting to protect the simulation from crashing if
    external systems send improperly typed data (e.g., sending the string "5"
    instead of the integer 5).

    Args:
        path (str): The dot-notation path to update.
        new_value (Any): The new value to apply.
    """
    if path in self._values:
        current_val = self._values[path].value
        validated_value = new_value

        # Type Validation & Casting
        if current_val is not None:
            expected_type = type(current_val)
            # If types don't match, attempt a safe cast (e.g., "5" -> 5)
            if not isinstance(new_value, expected_type):
                try:
                    validated_value = expected_type(new_value)
                    logger.debug(
                        f"Cast '{new_value}' to {expected_type.__name__} for '{path}'"
                    )
                except (ValueError, TypeError):
                    logger.error(
                        f"Type mismatch for '{path}'. Expected {expected_type.__name__}, "
                        f"got {type(new_value).__name__} with value '{new_value}'. Update ignored."
                    )
                    return  # Exit early to prevent crashing the simulation

        # Update the DynamicValue (triggers SimPy events)
        self._values[path].update(validated_value)

        # Synchronize the attribute on the original config object
        if "." in path:
            parent_path, attr = path.rsplit(".", 1)
            if parent_path in self._configs:
                setattr(self._configs[parent_path], attr, validated_value)
    else:
        logger.warning(f"Attempted to update non-existent path: {path}")

SimParameter dataclass

The master schema representing the state of a specific simulation entity (like a production line).

This object is registered with the SimulationRegistry, which flattens the nested dictionaries into dot-notation paths (e.g., 'Line_A.arrival.standard.rate').

Attributes:

Name Type Description
sim_id str

The unique prefix for this group of parameters (e.g., 'Line_A').

arrival Dict[str, DistributionConfig]

Configurations for arrival generation rates.

service Dict[str, DistributionConfig]

Configurations for process task durations.

resources Dict[str, CapacityConfig]

Configurations for standard SimPy Resources.

containers Dict[str, CapacityConfig]

Configurations for continuous SimPy Containers.

stores Dict[str, CapacityConfig]

Configurations for discrete SimPy Stores.

Source code in src/dynamic_des/models/params.py
@dataclass
class SimParameter:
    """
    The master schema representing the state of a specific simulation entity (like a production line).

    This object is registered with the `SimulationRegistry`, which flattens
    the nested dictionaries into dot-notation paths (e.g., 'Line_A.arrival.standard.rate').

    Attributes:
        sim_id (str): The unique prefix for this group of parameters (e.g., 'Line_A').
        arrival (Dict[str, DistributionConfig]): Configurations for arrival generation rates.
        service (Dict[str, DistributionConfig]): Configurations for process task durations.
        resources (Dict[str, CapacityConfig]): Configurations for standard SimPy Resources.
        containers (Dict[str, CapacityConfig]): Configurations for continuous SimPy Containers.
        stores (Dict[str, CapacityConfig]): Configurations for discrete SimPy Stores.
    """

    sim_id: str
    arrival: Dict[str, DistributionConfig] = field(default_factory=dict)
    service: Dict[str, DistributionConfig] = field(default_factory=dict)
    # Standardized categories
    resources: Dict[str, CapacityConfig] = field(default_factory=dict)
    containers: Dict[str, CapacityConfig] = field(default_factory=dict)
    stores: Dict[str, CapacityConfig] = field(default_factory=dict)

CapacityConfig dataclass

Configuration for the capacity of a simulated resource, container, or store.

Attributes:

Name Type Description
current_cap int

The currently active capacity (e.g., number of active workers).

max_cap int

The absolute physical maximum capacity. current_cap cannot exceed this.

Source code in src/dynamic_des/models/params.py
@dataclass
class CapacityConfig:
    """
    Configuration for the capacity of a simulated resource, container, or store.

    Attributes:
        current_cap (int): The currently active capacity (e.g., number of active workers).
        max_cap (int): The absolute physical maximum capacity. `current_cap` cannot exceed this.
    """

    current_cap: int
    max_cap: int

DistributionConfig dataclass

Configuration for a statistical distribution used in the simulation.

This config is dynamically updatable. For example, you can change the rate of an 'exponential' distribution via Kafka, and the Sampler will instantly use the new rate for the next event.

Attributes:

Name Type Description
dist str

The type of distribution (e.g., 'exponential', 'normal', 'uniform').

rate Optional[float]

The rate parameter (lambda) for exponential distributions.

mean Optional[float]

The mean (mu) for normal distributions.

std Optional[float]

The standard deviation (sigma) for normal distributions.

Source code in src/dynamic_des/models/params.py
@dataclass
class DistributionConfig:
    """
    Configuration for a statistical distribution used in the simulation.

    This config is dynamically updatable. For example, you can change the `rate`
    of an 'exponential' distribution via Kafka, and the `Sampler` will instantly
    use the new rate for the next event.

    Attributes:
        dist (str): The type of distribution (e.g., 'exponential', 'normal', 'uniform').
        rate (Optional[float]): The rate parameter (lambda) for exponential distributions.
        mean (Optional[float]): The mean (mu) for normal distributions.
        std (Optional[float]): The standard deviation (sigma) for normal distributions.
    """

    dist: Literal["exponential", "normal", "lognormal"]
    rate: Optional[float] = None
    mean: Optional[float] = None
    std: Optional[float] = None

Data Payloads

TelemetryPayload

Bases: BaseStreamPayload

Schema for low-volume, single-metric telemetry updates.

This payload is used for publishing continuous system state variables—such as resource utilization, queue lengths, or system lag—typically used for real-time dashboards.

Attributes:

Name Type Description
stream_type Literal['telemetry']

Hardcoded to "telemetry" for downstream routing.

path_id str

The dot-notation path of the metric (e.g., 'Line_A.lathe.utilization').

value Any

The scalar value of the metric at the given simulation time.

Source code in src/dynamic_des/models/schemas.py
class TelemetryPayload(BaseStreamPayload):
    """
    Schema for low-volume, single-metric telemetry updates.

    This payload is used for publishing continuous system state variables—such as resource
    utilization, queue lengths, or system lag—typically used for real-time dashboards.

    Attributes:
        stream_type (Literal["telemetry"]): Hardcoded to "telemetry" for downstream routing.
        path_id (str): The dot-notation path of the metric (e.g., 'Line_A.lathe.utilization').
        value (Any): The scalar value of the metric at the given simulation time.
    """

    stream_type: Literal["telemetry"] = "telemetry"
    path_id: str = Field(
        ...,
        description="The dot-notation path of the metric (e.g., 'Line_A.lathe.utilization').",
    )
    value: Any = Field(..., description="The scalar value of the metric.")

EventPayload

Bases: BaseStreamPayload

Schema for high-volume discrete simulation events.

This payload tracks state transitions of specific entities (like tasks or parts) throughout the simulation lifecycle. The key attribute allows message brokers like Kafka to maintain strict chronological ordering for specific tasks.

Attributes:

Name Type Description
stream_type Literal['event']

Hardcoded to "event" for downstream routing.

key str

A unique identifier/partition key for the event (e.g., 'task-001').

value Dict[str, Any]

A dictionary containing the event's detailed payload.

Source code in src/dynamic_des/models/schemas.py
class EventPayload(BaseStreamPayload):
    """
    Schema for high-volume discrete simulation events.

    This payload tracks state transitions of specific entities (like tasks or parts) throughout
    the simulation lifecycle. The `key` attribute allows message brokers like Kafka to maintain
    strict chronological ordering for specific tasks.

    Attributes:
        stream_type (Literal["event"]): Hardcoded to "event" for downstream routing.
        key (str): A unique identifier/partition key for the event (e.g., 'task-001').
        value (Dict[str, Any]): A dictionary containing the event's detailed payload.
    """

    stream_type: Literal["event"] = "event"
    key: str = Field(
        ...,
        description="A unique identifier/partition key for the event (e.g., 'task-001').",
    )
    value: Dict[str, Any] = Field(
        ..., description="A dictionary containing the event payload."
    )

BaseStreamPayload

Bases: BaseModel

Base schema for all egress data emitted by the simulation engine.

This schema guarantees that all outgoing data streams share a common temporal context, allowing downstream systems to accurately synchronize simulation time with real-world time.

Attributes:

Name Type Description
stream_type Literal['telemetry', 'event']

An identifier indicating the nature of the payload.

sim_ts float

The simulation clock time (in seconds) when the payload was generated.

timestamp str

The real-world ISO 8601 timestamp indicating when the payload was generated.

Source code in src/dynamic_des/models/schemas.py
class BaseStreamPayload(BaseModel):
    """
    Base schema for all egress data emitted by the simulation engine.

    This schema guarantees that all outgoing data streams share a common temporal context,
    allowing downstream systems to accurately synchronize simulation time with real-world time.

    Attributes:
        stream_type (Literal["telemetry", "event"]): An identifier indicating the nature of the payload.
        sim_ts (float): The simulation clock time (in seconds) when the payload was generated.
        timestamp (str): The real-world ISO 8601 timestamp indicating when the payload was generated.
    """

    stream_type: Literal["telemetry", "event"]
    sim_ts: float = Field(..., description="The simulation clock time (in seconds).")
    timestamp: str = Field(..., description="The real-world ISO 8601 timestamp.")

Utilities

Sampler

Generates random numbers based on live DistributionConfig objects.

The Sampler evaluates the configuration at the exact moment it is called. This allows the simulation to dynamically respond to external parameter changes.

Attributes:

Name Type Description
rng Generator

The NumPy random number generator instance.

Source code in src/dynamic_des/core/sampler.py
class Sampler:
    """
    Generates random numbers based on live `DistributionConfig` objects.

    The Sampler evaluates the configuration *at the exact moment* it is called.
    This allows the simulation to dynamically respond to external parameter changes.

    Attributes:
        rng (np.random.Generator): The NumPy random number generator instance.
    """

    def __init__(self, rng: Optional[np.random.Generator] = None):
        """
        Initializes the Sampler.

        Args:
            rng (np.random.Generator, optional): A seeded NumPy random generator for reproducible runs.
                If None, a default unseeded generator is used.
        """
        self.rng = rng

    def sample(self, config: DistributionConfig, min_delay: float = 0.00001) -> float:
        """
        Draws a random sample using the current parameters in the provided config.

        Args:
            config (DistributionConfig): The live configuration object retrieved from the Registry.

        Returns:
            float: A sampled float representing a time duration (e.g., arrival gap or service time).
                   Returns 0.0 if the resulting sample is negative.

        Raises:
            ValueError: If the `dist` type string is not supported.
        """

        if config.dist == "exponential":
            scale = 1.0 / config.rate if config.rate and config.rate > 0 else 1.0
            if self.rng is None:
                return max(min_delay, scale)
            return max(min_delay, self.rng.exponential(scale))

        if config.dist == "normal":
            m = config.mean or 0.0
            if self.rng is None:
                return max(min_delay, m)
            val = self.rng.normal(m, config.std or 0.0)
            return max(min_delay, val)

        if config.dist == "lognormal":
            m, s = config.mean or 1.0, config.std or 0.1
            if self.rng is None:
                return max(min_delay, m)

            # mu/sigma conversion
            mu = np.log(m**2 / np.sqrt(s**2 + m**2))
            sigma = np.sqrt(np.log(1 + (s**2 / m**2)))
            return max(min_delay, self.rng.lognormal(mu, sigma))

        return min_delay

Functions

sample(config, min_delay=1e-05)

Draws a random sample using the current parameters in the provided config.

Parameters:

Name Type Description Default
config DistributionConfig

The live configuration object retrieved from the Registry.

required

Returns:

Name Type Description
float float

A sampled float representing a time duration (e.g., arrival gap or service time). Returns 0.0 if the resulting sample is negative.

Raises:

Type Description
ValueError

If the dist type string is not supported.

Source code in src/dynamic_des/core/sampler.py
def sample(self, config: DistributionConfig, min_delay: float = 0.00001) -> float:
    """
    Draws a random sample using the current parameters in the provided config.

    Args:
        config (DistributionConfig): The live configuration object retrieved from the Registry.

    Returns:
        float: A sampled float representing a time duration (e.g., arrival gap or service time).
               Returns 0.0 if the resulting sample is negative.

    Raises:
        ValueError: If the `dist` type string is not supported.
    """

    if config.dist == "exponential":
        scale = 1.0 / config.rate if config.rate and config.rate > 0 else 1.0
        if self.rng is None:
            return max(min_delay, scale)
        return max(min_delay, self.rng.exponential(scale))

    if config.dist == "normal":
        m = config.mean or 0.0
        if self.rng is None:
            return max(min_delay, m)
        val = self.rng.normal(m, config.std or 0.0)
        return max(min_delay, val)

    if config.dist == "lognormal":
        m, s = config.mean or 1.0, config.std or 0.1
        if self.rng is None:
            return max(min_delay, m)

        # mu/sigma conversion
        mu = np.log(m**2 / np.sqrt(s**2 + m**2))
        sigma = np.sqrt(np.log(1 + (s**2 / m**2)))
        return max(min_delay, self.rng.lognormal(mu, sigma))

    return min_delay

Resources

DynamicResource

Bases: BaseDynamicResource

A SimPy Resource with a dynamically adjustable capacity.

Unlike a standard simpy.Resource where capacity is fixed at creation, the DynamicResource listens to a SimulationRegistry path (e.g., 'Line_A.lathe.current_cap'). When the registry updates the capacity, the resource automatically adjusts, triggering pending requests if capacity increases.

Attributes:

Name Type Description
env DynamicRealtimeEnvironment

The active simulation environment.

sim_id str

The prefix ID (e.g., 'Line_A').

resource_id str

The specific resource name (e.g., 'lathe').

Source code in src/dynamic_des/resources/resource.py
class DynamicResource(BaseDynamicResource):
    """
    A SimPy Resource with a dynamically adjustable capacity.

    Unlike a standard `simpy.Resource` where capacity is fixed at creation,
    the `DynamicResource` listens to a `SimulationRegistry` path (e.g., 'Line_A.lathe.current_cap').
    When the registry updates the capacity, the resource automatically adjusts,
    triggering pending requests if capacity increases.

    Attributes:
        env (DynamicRealtimeEnvironment): The active simulation environment.
        sim_id (str): The prefix ID (e.g., 'Line_A').
        resource_id (str): The specific resource name (e.g., 'lathe').
    """

    def __init__(self, env: Environment, sim_id: str, resource_id: str):
        """
        Initializes the DynamicResource and binds it to the registry.

        Args:
            env (Environment): The SimPy environment (must include the RegistryMixIn).
            sim_id (str): The parent simulation ID.
            resource_id (str): The name of the resource.
        """
        super().__init__(env, sim_id, resource_id, "resources")

        max_cap = int(self._max_cap_val.value)
        # Prevent initialization out-of-bounds
        self._capacity = max(0, min(int(self._current_cap_val.value), max_cap))

        self.queue = PriorityStore(env)
        self.pool = Container(env, init=self._capacity, capacity=max_cap)

        self._request_event = self.env.event()
        self.env.process(self._dispatcher())

    @property
    def capacity(self) -> int:
        """
        The current active capacity of the resource.

        This value reflects the live state from the Registry and may change
        during the simulation.
        """
        return self._capacity

    @property
    def in_use(self) -> int:
        """Currently occupied slots (Total Capacity - Idle Tokens)."""
        return self._capacity - self.pool.level

    def request(self, priority: int = 1):
        """Returns a context-manager enabled request."""
        return DynamicResourceRequest(self, priority)

    def release(self):
        """Return a token to the pool."""
        return self.pool.put(1)

    def _dispatcher(self):
        """Bridges PriorityStore and Container without pre-fetching tokens."""
        while True:
            # If no one is waiting, sleep until a request is made
            if not self.queue.items:
                yield self._request_event
                self._request_event = self.env.event()

            # Wait for a physical token to be available
            yield self.pool.get(1)

            # Pull the highest-priority request currently in the queue
            ticket = yield self.queue.get()

            # If the user cancelled their request (e.g., timed out while waiting),
            # return the token to the pool and move to the next person.
            if getattr(ticket.item, "cancelled", False):
                self.pool.put(1)
                continue

            # Signal the user process
            if not ticket.item.triggered:
                ticket.item.succeed()

    def _handle_capacity_change(self, new_target: int):
        # Safely bind the new target to physical limits [0, max_cap]
        new_target = max(0, min(new_target, self.pool.capacity))

        diff = new_target - self._capacity
        if diff > 0:
            self._capacity += diff
            self.env.process(self._grow_pool(diff))
        elif diff < 0:
            amount = abs(diff)
            self._capacity -= amount
            self.env.process(self._shrink_pool(amount))

    def _grow_pool(self, amount: int):
        yield self.pool.put(amount)

    def _shrink_pool(self, amount: int):
        yield self.pool.get(amount)

Attributes

in_use property

Currently occupied slots (Total Capacity - Idle Tokens).

Functions

request(priority=1)

Returns a context-manager enabled request.

Source code in src/dynamic_des/resources/resource.py
def request(self, priority: int = 1):
    """Returns a context-manager enabled request."""
    return DynamicResourceRequest(self, priority)
release()

Return a token to the pool.

Source code in src/dynamic_des/resources/resource.py
def release(self):
    """Return a token to the pool."""
    return self.pool.put(1)

Admin & Infrastructure

KafkaAdminConnector

Unified Kafka Admin and Monitoring Connector.

This connector acts as a management layer for the simulation's Kafka infrastructure. It handles synchronous administrative tasks (topic creation) using kafka-python and asynchronous data operations (sending config, collecting telemetry/events) using aiokafka.

It maintains an internal state of simulation vitals and task lifecycles by consuming from the simulation's egress topics.

Attributes:

Name Type Description
bootstrap_servers str

Kafka broker addresses.

max_tasks int

The maximum number of task records to keep per service in memory to prevent unbounded growth.

kwargs dict

Additional arguments passed to Kafka clients.

Source code in src/dynamic_des/connectors/admin/kafka.py
class KafkaAdminConnector:
    """
    Unified Kafka Admin and Monitoring Connector.

    This connector acts as a management layer for the simulation's Kafka
    infrastructure. It handles synchronous administrative tasks (topic creation)
    using `kafka-python` and asynchronous data operations (sending config,
    collecting telemetry/events) using `aiokafka`.

    It maintains an internal state of simulation vitals and task lifecycles
    by consuming from the simulation's egress topics.

    Attributes:
        bootstrap_servers (str): Kafka broker addresses.
        max_tasks (int): The maximum number of task records to keep per service
            in memory to prevent unbounded growth.
        kwargs (dict): Additional arguments passed to Kafka clients.
    """

    def __init__(self, bootstrap_servers: str, max_tasks: int = 100, **kwargs):
        """
        Initialize the connector with broker settings and state limits.

        Args:
            bootstrap_servers: Kafka broker addresses.
            max_tasks: The maximum number of task records to keep per service
                in memory (rolling window).
            **kwargs: Additional arguments passed to Kafka clients (e.g., security settings).
        """
        self.bootstrap_servers = bootstrap_servers
        self.max_tasks = max_tasks
        self.kwargs = kwargs

        # Event State: sim_id -> service -> task_id -> {status: timestamp}
        self._state: DefaultDict[str, DefaultDict[str, Dict[str, Any]]] = defaultdict(
            lambda: defaultdict(dict)
        )
        # Telemetry State: path_id -> latest_value
        self._vitals: Dict[str, Any] = {}

    def create_topics(self, topics_config: List[Dict[str, Any]]):
        """
        Creates Kafka topics required for the simulation.

        This is a synchronous call to ensure that all necessary infrastructure
        (config, telemetry, and event topics) is ready before the simulation
        environment starts.

        Args:
            topics_config: A list of topic configurations.
                Each dict should contain 'name', and optionally 'partitions'
                and 'replication'.
        """
        admin_client = KafkaAdminClient(
            bootstrap_servers=self.bootstrap_servers,
            client_id="sim_admin",
            **self.kwargs,
        )

        new_topics = [
            NewTopic(
                name=cfg["name"],
                num_partitions=cfg.get("partitions", 1),
                replication_factor=cfg.get("replication", 1),
            )
            for cfg in topics_config
        ]

        try:
            admin_client.create_topics(new_topics=new_topics, validate_only=False)
        except TopicAlreadyExistsError:
            pass
        finally:
            admin_client.close()

    async def send_config(self, topic: str, path_id: str, value: Any):
        """
        Sends a surgical parameter update to a simulation config topic.

        This method acts as an external controller, allowing users to
        dynamically update registry paths (e.g., arrival rates or resource
        capacities) while the simulation is running.

        Args:
            topic: The Kafka topic the simulation is listening to for config.
            path_id: The registry dot-notation path (e.g., 'Line_A.lathe.max_cap').
            value: The new value to be applied to the path.
        """
        producer = AIOKafkaProducer(
            bootstrap_servers=self.bootstrap_servers, **self.kwargs
        )
        await producer.start()
        try:
            payload = json.dumps({"path_id": path_id, "value": value}).encode("utf-8")
            await producer.send_and_wait(topic, payload)
        finally:
            await producer.stop()

    async def collect_data(self, topics: List[str], auto_offset_reset: str = "latest"):
        """
        Async loop to consume from telemetry and event topics.

        This loop continuously listens to the simulation's egress and updates
        the connector's internal `_vitals` and `_state` attributes.

        Args:
            topics: List of topics to consume from (telemetry and events).
            auto_offset_reset: Where to start consuming if no offset is committed.
        """
        consumer = AIOKafkaConsumer(
            *topics,
            bootstrap_servers=self.bootstrap_servers,
            auto_offset_reset=auto_offset_reset,
            **self.kwargs,
        )
        await consumer.start()
        try:
            async for msg in consumer:
                data = json.loads(msg.value.decode("utf-8"))
                self._process_message(data)
        finally:
            await consumer.stop()

    def _process_message(self, data: Dict[str, Any]):
        """
        Routes incoming messages based on their JSON structure.

        Distinguishes between 'telemetry' (vitals like utilization) and
        'events' (task lifecycle steps like 'started' or 'finished').

        Args:
            data: The decoded JSON payload from Kafka.
        """

        # 1. Telemetry: 'path_id' is at the root level
        if "path_id" in data and not isinstance(data.get("value"), dict):
            self._vitals[data["path_id"]] = data["value"]

        # 2. Events: 'key' is at the root, 'value' is a dictionary
        elif "key" in data and isinstance(data.get("value"), dict):
            task_id = data["key"]
            payload = data["value"]
            status = payload.get("status")

            # Using 'timestamp' based on your actual JSON payload
            ts = data.get("timestamp")

            path_id = payload.get("path_id", "unknown.unknown.unknown")
            parts = path_id.split(".")
            sim_id = parts[0]
            service = parts[2] if len(parts) > 2 else "default"

            service_data = self._state[sim_id][service]

            # Prune oldest if at max capacity
            if task_id not in service_data:
                if len(service_data) >= self.max_tasks:
                    oldest_key = next(iter(service_data))
                    service_data.pop(oldest_key)

            # Update task status timestamp
            if task_id not in service_data:
                service_data[task_id] = {}

            service_data[task_id][status] = ts

    def get_vitals(self) -> Dict[str, Any]:
        """
        Returns the latest system telemetry metrics.

        Returns:
            A dictionary of path_id to latest value (e.g., utilization, queue length).
        """
        return self._vitals

    def get_state(self) -> Dict[str, Any]:
        """
        Returns the aggregated event state for task lifecycles.

        Returns:
            A nested dictionary: sim_id -> service -> task_id -> {status: timestamp}.
        """
        return self._state

Functions

create_topics(topics_config)

Creates Kafka topics required for the simulation.

This is a synchronous call to ensure that all necessary infrastructure (config, telemetry, and event topics) is ready before the simulation environment starts.

Parameters:

Name Type Description Default
topics_config List[Dict[str, Any]]

A list of topic configurations. Each dict should contain 'name', and optionally 'partitions' and 'replication'.

required
Source code in src/dynamic_des/connectors/admin/kafka.py
def create_topics(self, topics_config: List[Dict[str, Any]]):
    """
    Creates Kafka topics required for the simulation.

    This is a synchronous call to ensure that all necessary infrastructure
    (config, telemetry, and event topics) is ready before the simulation
    environment starts.

    Args:
        topics_config: A list of topic configurations.
            Each dict should contain 'name', and optionally 'partitions'
            and 'replication'.
    """
    admin_client = KafkaAdminClient(
        bootstrap_servers=self.bootstrap_servers,
        client_id="sim_admin",
        **self.kwargs,
    )

    new_topics = [
        NewTopic(
            name=cfg["name"],
            num_partitions=cfg.get("partitions", 1),
            replication_factor=cfg.get("replication", 1),
        )
        for cfg in topics_config
    ]

    try:
        admin_client.create_topics(new_topics=new_topics, validate_only=False)
    except TopicAlreadyExistsError:
        pass
    finally:
        admin_client.close()
send_config(topic, path_id, value) async

Sends a surgical parameter update to a simulation config topic.

This method acts as an external controller, allowing users to dynamically update registry paths (e.g., arrival rates or resource capacities) while the simulation is running.

Parameters:

Name Type Description Default
topic str

The Kafka topic the simulation is listening to for config.

required
path_id str

The registry dot-notation path (e.g., 'Line_A.lathe.max_cap').

required
value Any

The new value to be applied to the path.

required
Source code in src/dynamic_des/connectors/admin/kafka.py
async def send_config(self, topic: str, path_id: str, value: Any):
    """
    Sends a surgical parameter update to a simulation config topic.

    This method acts as an external controller, allowing users to
    dynamically update registry paths (e.g., arrival rates or resource
    capacities) while the simulation is running.

    Args:
        topic: The Kafka topic the simulation is listening to for config.
        path_id: The registry dot-notation path (e.g., 'Line_A.lathe.max_cap').
        value: The new value to be applied to the path.
    """
    producer = AIOKafkaProducer(
        bootstrap_servers=self.bootstrap_servers, **self.kwargs
    )
    await producer.start()
    try:
        payload = json.dumps({"path_id": path_id, "value": value}).encode("utf-8")
        await producer.send_and_wait(topic, payload)
    finally:
        await producer.stop()
collect_data(topics, auto_offset_reset='latest') async

Async loop to consume from telemetry and event topics.

This loop continuously listens to the simulation's egress and updates the connector's internal _vitals and _state attributes.

Parameters:

Name Type Description Default
topics List[str]

List of topics to consume from (telemetry and events).

required
auto_offset_reset str

Where to start consuming if no offset is committed.

'latest'
Source code in src/dynamic_des/connectors/admin/kafka.py
async def collect_data(self, topics: List[str], auto_offset_reset: str = "latest"):
    """
    Async loop to consume from telemetry and event topics.

    This loop continuously listens to the simulation's egress and updates
    the connector's internal `_vitals` and `_state` attributes.

    Args:
        topics: List of topics to consume from (telemetry and events).
        auto_offset_reset: Where to start consuming if no offset is committed.
    """
    consumer = AIOKafkaConsumer(
        *topics,
        bootstrap_servers=self.bootstrap_servers,
        auto_offset_reset=auto_offset_reset,
        **self.kwargs,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            data = json.loads(msg.value.decode("utf-8"))
            self._process_message(data)
    finally:
        await consumer.stop()
get_state()

Returns the aggregated event state for task lifecycles.

Returns:

Type Description
Dict[str, Any]

A nested dictionary: sim_id -> service -> task_id -> {status: timestamp}.

Source code in src/dynamic_des/connectors/admin/kafka.py
def get_state(self) -> Dict[str, Any]:
    """
    Returns the aggregated event state for task lifecycles.

    Returns:
        A nested dictionary: sim_id -> service -> task_id -> {status: timestamp}.
    """
    return self._state
get_vitals()

Returns the latest system telemetry metrics.

Returns:

Type Description
Dict[str, Any]

A dictionary of path_id to latest value (e.g., utilization, queue length).

Source code in src/dynamic_des/connectors/admin/kafka.py
def get_vitals(self) -> Dict[str, Any]:
    """
    Returns the latest system telemetry metrics.

    Returns:
        A dictionary of path_id to latest value (e.g., utilization, queue length).
    """
    return self._vitals

Connectors (Ingress)

BaseIngress

Base class for all ingress providers in the simulation.

Ingress providers act as asynchronous listeners that bridge external data sources (such as Kafka topics, Redis channels, or local schedules) with the simulation's internal state. They are responsible for fetching updates and placing them into a thread-safe queue for the registry to process.

Source code in src/dynamic_des/connectors/ingress/base.py
class BaseIngress:
    """
    Base class for all ingress providers in the simulation.

    Ingress providers act as asynchronous listeners that bridge external
    data sources (such as Kafka topics, Redis channels, or local schedules)
    with the simulation's internal state. They are responsible for fetching
    updates and placing them into a thread-safe queue for the registry to process.
    """

    async def run(self, ingress_queue: queue.Queue) -> None:
        """
        Listens to an external source and pushes updates to the ingress queue.

        This method should be implemented as an asynchronous loop that waits
        for external signals or data and converts them into (path, value)
        tuples suitable for the SimulationRegistry.

        Args:
            ingress_queue: A thread-safe queue used to transmit updates
                to the main simulation environment.

        Raises:
            NotImplementedError: If the subclass does not override this method.
        """
        raise NotImplementedError("Subclasses must implement the run method.")

Functions

run(ingress_queue) async

Listens to an external source and pushes updates to the ingress queue.

This method should be implemented as an asynchronous loop that waits for external signals or data and converts them into (path, value) tuples suitable for the SimulationRegistry.

Parameters:

Name Type Description Default
ingress_queue Queue

A thread-safe queue used to transmit updates to the main simulation environment.

required

Raises:

Type Description
NotImplementedError

If the subclass does not override this method.

Source code in src/dynamic_des/connectors/ingress/base.py
async def run(self, ingress_queue: queue.Queue) -> None:
    """
    Listens to an external source and pushes updates to the ingress queue.

    This method should be implemented as an asynchronous loop that waits
    for external signals or data and converts them into (path, value)
    tuples suitable for the SimulationRegistry.

    Args:
        ingress_queue: A thread-safe queue used to transmit updates
            to the main simulation environment.

    Raises:
        NotImplementedError: If the subclass does not override this method.
    """
    raise NotImplementedError("Subclasses must implement the run method.")

KafkaIngress

Bases: BaseIngress

Resilient Kafka consumer for dynamic simulation configuration updates.

This connector subscribes to a specified Kafka topic and parses incoming JSON messages into state updates for the SimulationRegistry. It is designed to handle real-time control signals that modify simulation parameters (e.g., changing a machine's capacity or an arrival rate) while the model is executing.

The expected message format is a JSON object containing a 'path_id' string and a 'value' of any serializable type.

Attributes:

Name Type Description
topic str

The Kafka topic name used for configuration signals.

bootstrap_servers str

Comma-separated string of Kafka broker addresses.

kwargs dict

Additional configuration parameters for the AIOKafkaConsumer.

Source code in src/dynamic_des/connectors/ingress/kafka.py
class KafkaIngress(BaseIngress):
    """
    Resilient Kafka consumer for dynamic simulation configuration updates.

    This connector subscribes to a specified Kafka topic and parses incoming
    JSON messages into state updates for the SimulationRegistry. It is designed
    to handle real-time control signals that modify simulation parameters
    (e.g., changing a machine's capacity or an arrival rate) while the model
    is executing.

    The expected message format is a JSON object containing a 'path_id' string
    and a 'value' of any serializable type.

    Attributes:
        topic (str): The Kafka topic name used for configuration signals.
        bootstrap_servers (str): Comma-separated string of Kafka broker addresses.
        kwargs (dict): Additional configuration parameters for the AIOKafkaConsumer.
    """

    def __init__(self, topic: str, bootstrap_servers: str, **kwargs: Any):
        """
        Initializes the KafkaIngress with connection and subscription details.

        Args:
            topic: The Kafka topic to subscribe to.
            bootstrap_servers: Kafka broker address or list of addresses.
            **kwargs: Arbitrary keyword arguments passed to the underlying
                AIOKafkaConsumer (e.g., group_id, security_protocol).
        """
        self.topic = topic
        self.bootstrap_servers = bootstrap_servers
        self.kwargs = kwargs

    async def run(self, ingress_queue: queue.Queue) -> None:
        """
        Main execution loop that consumes Kafka messages and populates the ingress queue.

        This method maintains a persistent connection to the Kafka broker. It
        features an exponential backoff strategy to handle network interruptions
        and connection failures gracefully.

        Decoded messages are validated for basic structure; malformed JSON or
        missing keys are logged as warnings without halting the consumer.

        Args:
            ingress_queue: A thread-safe queue where (path_id, value) tuples
                are placed for processing by the SimulationRegistry.

        Note:
            The loop responds to `asyncio.CancelledError` for clean shutdown,
            ensuring the Kafka consumer stops and releases resources properly.
        """
        backoff = 1.0
        while True:
            try:
                consumer = AIOKafkaConsumer(
                    self.topic, bootstrap_servers=self.bootstrap_servers, **self.kwargs
                )
                await consumer.start()
                logger.info(f"Connected to Kafka topic: {self.topic}")
                backoff = 1.0
                try:
                    async for msg in consumer:
                        try:
                            data = json.loads(msg.value.decode("utf-8"))
                            ingress_queue.put((data["path_id"], data["value"]))
                        except (json.JSONDecodeError, KeyError) as e:
                            logger.warning(f"Malformed Kafka message: {e}")
                finally:
                    await consumer.stop()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Kafka Ingress error: {e}. Retrying in {backoff}s")
                await asyncio.sleep(backoff)
                backoff = min(backoff * 2, 60.0)

Functions

__init__(topic, bootstrap_servers, **kwargs)

Initializes the KafkaIngress with connection and subscription details.

Parameters:

Name Type Description Default
topic str

The Kafka topic to subscribe to.

required
bootstrap_servers str

Kafka broker address or list of addresses.

required
**kwargs Any

Arbitrary keyword arguments passed to the underlying AIOKafkaConsumer (e.g., group_id, security_protocol).

{}
Source code in src/dynamic_des/connectors/ingress/kafka.py
def __init__(self, topic: str, bootstrap_servers: str, **kwargs: Any):
    """
    Initializes the KafkaIngress with connection and subscription details.

    Args:
        topic: The Kafka topic to subscribe to.
        bootstrap_servers: Kafka broker address or list of addresses.
        **kwargs: Arbitrary keyword arguments passed to the underlying
            AIOKafkaConsumer (e.g., group_id, security_protocol).
    """
    self.topic = topic
    self.bootstrap_servers = bootstrap_servers
    self.kwargs = kwargs
run(ingress_queue) async

Main execution loop that consumes Kafka messages and populates the ingress queue.

This method maintains a persistent connection to the Kafka broker. It features an exponential backoff strategy to handle network interruptions and connection failures gracefully.

Decoded messages are validated for basic structure; malformed JSON or missing keys are logged as warnings without halting the consumer.

Parameters:

Name Type Description Default
ingress_queue Queue

A thread-safe queue where (path_id, value) tuples are placed for processing by the SimulationRegistry.

required
Note

The loop responds to asyncio.CancelledError for clean shutdown, ensuring the Kafka consumer stops and releases resources properly.

Source code in src/dynamic_des/connectors/ingress/kafka.py
async def run(self, ingress_queue: queue.Queue) -> None:
    """
    Main execution loop that consumes Kafka messages and populates the ingress queue.

    This method maintains a persistent connection to the Kafka broker. It
    features an exponential backoff strategy to handle network interruptions
    and connection failures gracefully.

    Decoded messages are validated for basic structure; malformed JSON or
    missing keys are logged as warnings without halting the consumer.

    Args:
        ingress_queue: A thread-safe queue where (path_id, value) tuples
            are placed for processing by the SimulationRegistry.

    Note:
        The loop responds to `asyncio.CancelledError` for clean shutdown,
        ensuring the Kafka consumer stops and releases resources properly.
    """
    backoff = 1.0
    while True:
        try:
            consumer = AIOKafkaConsumer(
                self.topic, bootstrap_servers=self.bootstrap_servers, **self.kwargs
            )
            await consumer.start()
            logger.info(f"Connected to Kafka topic: {self.topic}")
            backoff = 1.0
            try:
                async for msg in consumer:
                    try:
                        data = json.loads(msg.value.decode("utf-8"))
                        ingress_queue.put((data["path_id"], data["value"]))
                    except (json.JSONDecodeError, KeyError) as e:
                        logger.warning(f"Malformed Kafka message: {e}")
            finally:
                await consumer.stop()
        except asyncio.CancelledError:
            break
        except Exception as e:
            logger.error(f"Kafka Ingress error: {e}. Retrying in {backoff}s")
            await asyncio.sleep(backoff)
            backoff = min(backoff * 2, 60.0)

LocalIngress

Bases: BaseIngress

Ingress provider for deterministic, time-scheduled parameter updates.

This connector is primarily used for testing, benchmarking, or local simulation runs where parameter changes need to occur at specific wall-clock intervals without requiring an external message broker like Kafka. It feeds a pre-defined sequence of updates into the simulation registry based on relative delays.

Attributes:

Name Type Description
schedule List[Tuple[float, str, Any]]

A chronologically sorted list of updates, where each tuple contains (delay_seconds, path_id, value).

Source code in src/dynamic_des/connectors/ingress/local.py
class LocalIngress(BaseIngress):
    """
    Ingress provider for deterministic, time-scheduled parameter updates.

    This connector is primarily used for testing, benchmarking, or local
    simulation runs where parameter changes need to occur at specific
    wall-clock intervals without requiring an external message broker like
    Kafka. It feeds a pre-defined sequence of updates into the simulation
    registry based on relative delays.

    Attributes:
        schedule (List[Tuple[float, str, Any]]): A chronologically sorted
            list of updates, where each tuple contains (delay_seconds,
            path_id, value).
    """

    def __init__(self, schedule: List[Tuple[float, str, Any]]):
        """
        Initializes the LocalIngress and sorts the provided schedule.

        Args:
            schedule: A list of tuples representing scheduled updates.
                Format: [(delay_from_start, "registry_path", new_value), ...].
                The list is automatically sorted by the delay time.
        """
        # Sort schedule by delay to ensure chronological processing
        self.schedule = sorted(schedule, key=lambda x: x[0])

    async def run(self, ingress_queue: queue.Queue) -> None:
        """
        Processes the schedule and pushes updates to the queue at the
        correct wall-clock times.

        This method calculates the relative sleep intervals between
        scheduled events to ensure that updates are placed in the
        ingress queue precisely when requested. Because it uses
        `asyncio.sleep`, it does not block the main simulation execution.

        Args:
            ingress_queue: A thread-safe queue used to transmit the
                (path_id, value) updates to the SimulationRegistry.
        """
        last_time = 0.0
        for delay, path_id, value in self.schedule:
            # Wait for the relative time difference
            await asyncio.sleep(delay - last_time)
            ingress_queue.put((path_id, value))
            last_time = delay

Functions

__init__(schedule)

Initializes the LocalIngress and sorts the provided schedule.

Parameters:

Name Type Description Default
schedule List[Tuple[float, str, Any]]

A list of tuples representing scheduled updates. Format: [(delay_from_start, "registry_path", new_value), ...]. The list is automatically sorted by the delay time.

required
Source code in src/dynamic_des/connectors/ingress/local.py
def __init__(self, schedule: List[Tuple[float, str, Any]]):
    """
    Initializes the LocalIngress and sorts the provided schedule.

    Args:
        schedule: A list of tuples representing scheduled updates.
            Format: [(delay_from_start, "registry_path", new_value), ...].
            The list is automatically sorted by the delay time.
    """
    # Sort schedule by delay to ensure chronological processing
    self.schedule = sorted(schedule, key=lambda x: x[0])
run(ingress_queue) async

Processes the schedule and pushes updates to the queue at the correct wall-clock times.

This method calculates the relative sleep intervals between scheduled events to ensure that updates are placed in the ingress queue precisely when requested. Because it uses asyncio.sleep, it does not block the main simulation execution.

Parameters:

Name Type Description Default
ingress_queue Queue

A thread-safe queue used to transmit the (path_id, value) updates to the SimulationRegistry.

required
Source code in src/dynamic_des/connectors/ingress/local.py
async def run(self, ingress_queue: queue.Queue) -> None:
    """
    Processes the schedule and pushes updates to the queue at the
    correct wall-clock times.

    This method calculates the relative sleep intervals between
    scheduled events to ensure that updates are placed in the
    ingress queue precisely when requested. Because it uses
    `asyncio.sleep`, it does not block the main simulation execution.

    Args:
        ingress_queue: A thread-safe queue used to transmit the
            (path_id, value) updates to the SimulationRegistry.
    """
    last_time = 0.0
    for delay, path_id, value in self.schedule:
        # Wait for the relative time difference
        await asyncio.sleep(delay - last_time)
        ingress_queue.put((path_id, value))
        last_time = delay

PostgresIngress

Bases: BaseIngress

Asynchronous ingress provider for polling updates from a PostgreSQL database.

This connector is intended to act as a bridge for scenarios where simulation parameters are managed within a relational database. It would typically poll a configuration table for changes or listen to database notifications (LISTEN/NOTIFY) to trigger real-time updates in the simulation registry.

Note

This class is currently a placeholder and is planned for a future release.

Source code in src/dynamic_des/connectors/ingress/postgres.py
class PostgresIngress(BaseIngress):
    """
    Asynchronous ingress provider for polling updates from a PostgreSQL database.

    This connector is intended to act as a bridge for scenarios where simulation
    parameters are managed within a relational database. It would typically
    poll a configuration table for changes or listen to database notifications
    (LISTEN/NOTIFY) to trigger real-time updates in the simulation registry.

    Note:
        This class is currently a placeholder and is planned for a future release.
    """

    def __init__(self, *args, **kwargs):
        """
        Initializes the PostgresIngress placeholder.

        Raises:
            NotImplementedError: Always raised as the connector is not yet implemented.
        """
        raise NotImplementedError("PostgresIngress is planned for a future release.")

    async def run(self, ingress_queue: queue.Queue) -> None:
        """
        Placeholder for the database polling execution loop.

        Future implementations will likely utilize an async driver (e.g., asyncpg)
        to fetch rows and convert them into (path, value) tuples for the
        ingress queue.

        Args:
            ingress_queue: A thread-safe queue used to transmit updates
                to the SimulationRegistry.
        """
        pass

Functions

__init__(*args, **kwargs)

Initializes the PostgresIngress placeholder.

Raises:

Type Description
NotImplementedError

Always raised as the connector is not yet implemented.

Source code in src/dynamic_des/connectors/ingress/postgres.py
def __init__(self, *args, **kwargs):
    """
    Initializes the PostgresIngress placeholder.

    Raises:
        NotImplementedError: Always raised as the connector is not yet implemented.
    """
    raise NotImplementedError("PostgresIngress is planned for a future release.")
run(ingress_queue) async

Placeholder for the database polling execution loop.

Future implementations will likely utilize an async driver (e.g., asyncpg) to fetch rows and convert them into (path, value) tuples for the ingress queue.

Parameters:

Name Type Description Default
ingress_queue Queue

A thread-safe queue used to transmit updates to the SimulationRegistry.

required
Source code in src/dynamic_des/connectors/ingress/postgres.py
async def run(self, ingress_queue: queue.Queue) -> None:
    """
    Placeholder for the database polling execution loop.

    Future implementations will likely utilize an async driver (e.g., asyncpg)
    to fetch rows and convert them into (path, value) tuples for the
    ingress queue.

    Args:
        ingress_queue: A thread-safe queue used to transmit updates
            to the SimulationRegistry.
    """
    pass

RedisIngress

Bases: BaseIngress

Asynchronous ingress provider for consuming updates from Redis.

This connector is designed to facilitate high-speed, low-latency parameter updates by leveraging Redis as a middleware layer. Potential implementations include subscribing to Pub/Sub channels for real-time broadcast signals or monitoring Redis Streams for sequential state updates.

Note

This class is currently a placeholder and is planned for a future release.

Source code in src/dynamic_des/connectors/ingress/redis.py
class RedisIngress(BaseIngress):
    """
    Asynchronous ingress provider for consuming updates from Redis.

    This connector is designed to facilitate high-speed, low-latency parameter
    updates by leveraging Redis as a middleware layer. Potential implementations
    include subscribing to Pub/Sub channels for real-time broadcast signals
    or monitoring Redis Streams for sequential state updates.

    Note:
        This class is currently a placeholder and is planned for a future release.
    """

    def __init__(self, *args, **kwargs):
        """
        Initializes the RedisIngress placeholder.

        Raises:
            NotImplementedError: Always raised as the connector is not yet implemented.
        """
        raise NotImplementedError("RedisIngress is planned for a future release.")

    async def run(self, ingress_queue: queue.Queue) -> None:
        """
        Placeholder for the Redis consumption execution loop.

        Future implementations will likely utilize an async Redis client
        to listen for messages and transform them into (path, value)
        tuples for the internal ingress queue.

        Args:
            ingress_queue: A thread-safe queue used to transmit updates
                to the SimulationRegistry.
        """
        pass

Functions

__init__(*args, **kwargs)

Initializes the RedisIngress placeholder.

Raises:

Type Description
NotImplementedError

Always raised as the connector is not yet implemented.

Source code in src/dynamic_des/connectors/ingress/redis.py
def __init__(self, *args, **kwargs):
    """
    Initializes the RedisIngress placeholder.

    Raises:
        NotImplementedError: Always raised as the connector is not yet implemented.
    """
    raise NotImplementedError("RedisIngress is planned for a future release.")
run(ingress_queue) async

Placeholder for the Redis consumption execution loop.

Future implementations will likely utilize an async Redis client to listen for messages and transform them into (path, value) tuples for the internal ingress queue.

Parameters:

Name Type Description Default
ingress_queue Queue

A thread-safe queue used to transmit updates to the SimulationRegistry.

required
Source code in src/dynamic_des/connectors/ingress/redis.py
async def run(self, ingress_queue: queue.Queue) -> None:
    """
    Placeholder for the Redis consumption execution loop.

    Future implementations will likely utilize an async Redis client
    to listen for messages and transform them into (path, value)
    tuples for the internal ingress queue.

    Args:
        ingress_queue: A thread-safe queue used to transmit updates
            to the SimulationRegistry.
    """
    pass

Connectors (Egress)

BaseEgress

Base class for all egress providers in the simulation.

Egress providers act as asynchronous bridges that consume processed simulation data (telemetry and events) from a thread-safe internal queue and transmit it to external destinations such as Kafka, databases, or the console.

Source code in src/dynamic_des/connectors/egress/base.py
class BaseEgress:
    """
    Base class for all egress providers in the simulation.

    Egress providers act as asynchronous bridges that consume processed
    simulation data (telemetry and events) from a thread-safe internal queue
    and transmit it to external destinations such as Kafka, databases,
    or the console.
    """

    async def run(self, egress_queue: queue.Queue) -> None:
        """
        Listens to the internal queue and pushes data to an external sink.

        This method should contain an asynchronous loop that polls the
        provided queue and handles the networking/I/O logic specific
        to the destination system.

        Args:
            egress_queue: A thread-safe queue containing batches of
                dictionaries to be exported.

        Raises:
            NotImplementedError: If the subclass does not override this method.
        """
        raise NotImplementedError("Subclasses must implement the run method.")

Functions

run(egress_queue) async

Listens to the internal queue and pushes data to an external sink.

This method should contain an asynchronous loop that polls the provided queue and handles the networking/I/O logic specific to the destination system.

Parameters:

Name Type Description Default
egress_queue Queue

A thread-safe queue containing batches of dictionaries to be exported.

required

Raises:

Type Description
NotImplementedError

If the subclass does not override this method.

Source code in src/dynamic_des/connectors/egress/base.py
async def run(self, egress_queue: queue.Queue) -> None:
    """
    Listens to the internal queue and pushes data to an external sink.

    This method should contain an asynchronous loop that polls the
    provided queue and handles the networking/I/O logic specific
    to the destination system.

    Args:
        egress_queue: A thread-safe queue containing batches of
            dictionaries to be exported.

    Raises:
        NotImplementedError: If the subclass does not override this method.
    """
    raise NotImplementedError("Subclasses must implement the run method.")

KafkaEgress

Bases: BaseEgress

High-throughput Kafka producer for simulation telemetry and events.

This connector utilizes aiokafka for asynchronous I/O and orjson for fast serialization. It implements a resilient connection loop with exponential backoff and handles two distinct data streams: 1. Telemetry: Typically mapped to single-partition topics for system metrics. 2. Events: Mapped to partitioned topics using event keys for ordering guarantees.

Attributes:

Name Type Description
telemetry_topic str

The Kafka topic name for telemetry data.

event_topic str

The Kafka topic name for lifecycle events.

producer_config dict

Configuration dictionary passed to AIOKafkaProducer, including performance optimizations like lz4 compression and batch lingering.

Source code in src/dynamic_des/connectors/egress/kafka.py
class KafkaEgress(BaseEgress):
    """
    High-throughput Kafka producer for simulation telemetry and events.

    This connector utilizes `aiokafka` for asynchronous I/O and `orjson` for fast
    serialization. It implements a resilient connection loop with exponential
    backoff and handles two distinct data streams:
    1. Telemetry: Typically mapped to single-partition topics for system metrics.
    2. Events: Mapped to partitioned topics using event keys for ordering guarantees.

    Attributes:
        telemetry_topic (str): The Kafka topic name for telemetry data.
        event_topic (str): The Kafka topic name for lifecycle events.
        producer_config (dict): Configuration dictionary passed to AIOKafkaProducer,
            including performance optimizations like lz4 compression and batch lingering.
    """

    def __init__(
        self,
        telemetry_topic: str,
        event_topic: str,
        bootstrap_servers: str,
        **kwargs: Any,
    ):
        """
        Initializes the KafkaEgress with topic and connection settings.

        Args:
            telemetry_topic: Destination topic for telemetry stream.
            event_topic: Destination topic for event stream.
            bootstrap_servers: Comma-separated list of Kafka brokers.
            **kwargs: Additional overrides for the AIOKafkaProducer configuration.
        """
        self.telemetry_topic = telemetry_topic
        self.event_topic = event_topic

        # High-performance defaults for 100k/sec
        self.producer_config = {
            "bootstrap_servers": bootstrap_servers,
            "linger_ms": 10,  # Batch messages for 10ms before sending
            "compression_type": "lz4",  # Fast compression for high volume
            "max_batch_size": 131072,  # 128KB batch size
            **kwargs,
        }

    async def run(self, egress_queue: queue.Queue) -> None:
        """
        The main execution loop that consumes the egress queue and publishes to Kafka.

        This method maintains a persistent connection to Kafka. If the connection
        is lost, it implements an exponential backoff retry strategy. It polls
        the internal `egress_queue` for batches of data, determines the target
        topic based on the 'stream_type', and performs asynchronous sends.

        Args:
            egress_queue: A thread-safe queue containing lists of dictionaries
                (batches) generated by the EgressMixIn.

        Note:
            The loop exits gracefully upon receiving an `asyncio.CancelledError`,
            ensuring the Kafka producer is stopped correctly.
        """
        backoff = 1.0
        max_backoff = 60.0

        while True:
            try:
                producer = AIOKafkaProducer(**self.producer_config)
                await producer.start()
                logger.info("Kafka Egress producer connected successfully.")
                backoff = 1.0  # Reset backoff on successful connection

                try:
                    while True:
                        try:
                            # 'batch' is a list of dictionaries from the EgressMixIn
                            batch = egress_queue.get_nowait()

                            for data in batch:
                                stream = data.pop("stream_type")
                                if stream == "telemetry":
                                    topic = self.telemetry_topic
                                    key = (
                                        str(data["path_id"]).encode()
                                        if "key" in data
                                        else None
                                    )
                                else:
                                    topic = self.event_topic
                                    # Use the event_key for Kafka partitioning
                                    key = (
                                        str(data["key"]).encode()
                                        if "key" in data
                                        else None
                                    )

                                # orjson.dumps returns bytes directly (faster than json.dumps + encode)
                                await producer.send(
                                    topic, value=orjson.dumps(data), key=key
                                )

                        except queue.Empty:
                            # Yield to loop if queue is empty
                            await asyncio.sleep(0.001)
                finally:
                    await producer.stop()

            except asyncio.CancelledError:
                logger.info("Kafka Egress shut down requested. Exiting loop.")
                break
            except Exception as e:
                logger.error(
                    f"Kafka Egress connection failed: {e}. Retrying in {backoff} seconds..."
                )
                await asyncio.sleep(backoff)
                backoff = min(backoff * 2, max_backoff)

Functions

__init__(telemetry_topic, event_topic, bootstrap_servers, **kwargs)

Initializes the KafkaEgress with topic and connection settings.

Parameters:

Name Type Description Default
telemetry_topic str

Destination topic for telemetry stream.

required
event_topic str

Destination topic for event stream.

required
bootstrap_servers str

Comma-separated list of Kafka brokers.

required
**kwargs Any

Additional overrides for the AIOKafkaProducer configuration.

{}
Source code in src/dynamic_des/connectors/egress/kafka.py
def __init__(
    self,
    telemetry_topic: str,
    event_topic: str,
    bootstrap_servers: str,
    **kwargs: Any,
):
    """
    Initializes the KafkaEgress with topic and connection settings.

    Args:
        telemetry_topic: Destination topic for telemetry stream.
        event_topic: Destination topic for event stream.
        bootstrap_servers: Comma-separated list of Kafka brokers.
        **kwargs: Additional overrides for the AIOKafkaProducer configuration.
    """
    self.telemetry_topic = telemetry_topic
    self.event_topic = event_topic

    # High-performance defaults for 100k/sec
    self.producer_config = {
        "bootstrap_servers": bootstrap_servers,
        "linger_ms": 10,  # Batch messages for 10ms before sending
        "compression_type": "lz4",  # Fast compression for high volume
        "max_batch_size": 131072,  # 128KB batch size
        **kwargs,
    }
run(egress_queue) async

The main execution loop that consumes the egress queue and publishes to Kafka.

This method maintains a persistent connection to Kafka. If the connection is lost, it implements an exponential backoff retry strategy. It polls the internal egress_queue for batches of data, determines the target topic based on the 'stream_type', and performs asynchronous sends.

Parameters:

Name Type Description Default
egress_queue Queue

A thread-safe queue containing lists of dictionaries (batches) generated by the EgressMixIn.

required
Note

The loop exits gracefully upon receiving an asyncio.CancelledError, ensuring the Kafka producer is stopped correctly.

Source code in src/dynamic_des/connectors/egress/kafka.py
async def run(self, egress_queue: queue.Queue) -> None:
    """
    The main execution loop that consumes the egress queue and publishes to Kafka.

    This method maintains a persistent connection to Kafka. If the connection
    is lost, it implements an exponential backoff retry strategy. It polls
    the internal `egress_queue` for batches of data, determines the target
    topic based on the 'stream_type', and performs asynchronous sends.

    Args:
        egress_queue: A thread-safe queue containing lists of dictionaries
            (batches) generated by the EgressMixIn.

    Note:
        The loop exits gracefully upon receiving an `asyncio.CancelledError`,
        ensuring the Kafka producer is stopped correctly.
    """
    backoff = 1.0
    max_backoff = 60.0

    while True:
        try:
            producer = AIOKafkaProducer(**self.producer_config)
            await producer.start()
            logger.info("Kafka Egress producer connected successfully.")
            backoff = 1.0  # Reset backoff on successful connection

            try:
                while True:
                    try:
                        # 'batch' is a list of dictionaries from the EgressMixIn
                        batch = egress_queue.get_nowait()

                        for data in batch:
                            stream = data.pop("stream_type")
                            if stream == "telemetry":
                                topic = self.telemetry_topic
                                key = (
                                    str(data["path_id"]).encode()
                                    if "key" in data
                                    else None
                                )
                            else:
                                topic = self.event_topic
                                # Use the event_key for Kafka partitioning
                                key = (
                                    str(data["key"]).encode()
                                    if "key" in data
                                    else None
                                )

                            # orjson.dumps returns bytes directly (faster than json.dumps + encode)
                            await producer.send(
                                topic, value=orjson.dumps(data), key=key
                            )

                    except queue.Empty:
                        # Yield to loop if queue is empty
                        await asyncio.sleep(0.001)
            finally:
                await producer.stop()

        except asyncio.CancelledError:
            logger.info("Kafka Egress shut down requested. Exiting loop.")
            break
        except Exception as e:
            logger.error(
                f"Kafka Egress connection failed: {e}. Retrying in {backoff} seconds..."
            )
            await asyncio.sleep(backoff)
            backoff = min(backoff * 2, max_backoff)

ConsoleEgress

Bases: BaseEgress

Local egress provider that outputs simulation data to the standard console.

This connector serves as a debugging and local development tool. It processes batched data from the simulation and prints formatted entries to stdout, distinguishing between telemetry metrics and discrete events via prefixes.

Source code in src/dynamic_des/connectors/egress/local.py
class ConsoleEgress(BaseEgress):
    """
    Local egress provider that outputs simulation data to the standard console.

    This connector serves as a debugging and local development tool. It processes
    batched data from the simulation and prints formatted entries to stdout,
    distinguishing between telemetry metrics and discrete events via prefixes.
    """

    async def run(self, egress_queue: queue.Queue) -> None:
        """
        Continuously polls the egress queue and prints formatted results.

        This method implements an asynchronous polling loop. It extracts batches
        from the internal queue, identifies the data stream type, and formats the
        output with specific prefixes:
        - [TEL]: Telemetry data (e.g., resource utilization, queue lengths).
        - [EVT]: Event data (e.g., task lifecycle transitions).

        Args:
            egress_queue: A thread-safe queue containing lists of dictionaries
                (batches) generated by the EgressMixIn.

        Note:
            If the queue is empty, the method yields control to the asyncio
            event loop for a short duration to prevent CPU pinning.
        """
        while True:
            try:
                # Receive a batch (list) of messages
                batch = egress_queue.get_nowait()

                for data in batch:
                    # Identify the stream type for the prefix
                    stream = data.pop("stream_type", "unknown")
                    prefix = "[TEL]" if stream == "telemetry" else "[EVT]"

                    # Log the remaining data (path_id/key, value, timestamp)
                    logger.info(f"{prefix} {data}")

            except queue.Empty:
                # Yield to the event loop
                await asyncio.sleep(0.1)

Functions

run(egress_queue) async

Continuously polls the egress queue and prints formatted results.

This method implements an asynchronous polling loop. It extracts batches from the internal queue, identifies the data stream type, and formats the output with specific prefixes: - [TEL]: Telemetry data (e.g., resource utilization, queue lengths). - [EVT]: Event data (e.g., task lifecycle transitions).

Parameters:

Name Type Description Default
egress_queue Queue

A thread-safe queue containing lists of dictionaries (batches) generated by the EgressMixIn.

required
Note

If the queue is empty, the method yields control to the asyncio event loop for a short duration to prevent CPU pinning.

Source code in src/dynamic_des/connectors/egress/local.py
async def run(self, egress_queue: queue.Queue) -> None:
    """
    Continuously polls the egress queue and prints formatted results.

    This method implements an asynchronous polling loop. It extracts batches
    from the internal queue, identifies the data stream type, and formats the
    output with specific prefixes:
    - [TEL]: Telemetry data (e.g., resource utilization, queue lengths).
    - [EVT]: Event data (e.g., task lifecycle transitions).

    Args:
        egress_queue: A thread-safe queue containing lists of dictionaries
            (batches) generated by the EgressMixIn.

    Note:
        If the queue is empty, the method yields control to the asyncio
        event loop for a short duration to prevent CPU pinning.
    """
    while True:
        try:
            # Receive a batch (list) of messages
            batch = egress_queue.get_nowait()

            for data in batch:
                # Identify the stream type for the prefix
                stream = data.pop("stream_type", "unknown")
                prefix = "[TEL]" if stream == "telemetry" else "[EVT]"

                # Log the remaining data (path_id/key, value, timestamp)
                logger.info(f"{prefix} {data}")

        except queue.Empty:
            # Yield to the event loop
            await asyncio.sleep(0.1)

PostgresEgress

Bases: BaseEgress

Asynchronous egress provider for persisting simulation data to PostgreSQL.

This connector is intended to handle long-term storage of simulation results, allowing for historical analysis and complex SQL queries on telemetry and event data.

Source code in src/dynamic_des/connectors/egress/postgres.py
class PostgresEgress(BaseEgress):
    """
    Asynchronous egress provider for persisting simulation data to PostgreSQL.

    This connector is intended to handle long-term storage of simulation results,
    allowing for historical analysis and complex SQL queries on telemetry
    and event data.
    """

    def __init__(
        self, connection_dsn: str, table_name: str = "simulation_data", **kwargs: Any
    ):
        """
        Initializes the PostgresEgress with connection details.

        Args:
            connection_dsn: PostgreSQL connection string (DSN).
            table_name: Target table for simulation records.
            **kwargs: Additional connection pool arguments.
        """
        self.dsn = connection_dsn
        self.table_name = table_name
        self.kwargs = kwargs

    async def run(self, egress_queue: queue.Queue) -> None:
        """
        Main execution loop for PostgreSQL data persistence.

        Should implement a resilient connection using an async driver (like asyncpg),
        performing bulk inserts of batched data from the egress queue.

        Args:
            egress_queue: A thread-safe queue containing batches of simulation data.

        Raises:
            NotImplementedError: This connector is currently a placeholder.
        """
        logger.warning("PostgresEgress is not yet implemented.")
        raise NotImplementedError("PostgresEgress.run() is not implemented.")

Functions

__init__(connection_dsn, table_name='simulation_data', **kwargs)

Initializes the PostgresEgress with connection details.

Parameters:

Name Type Description Default
connection_dsn str

PostgreSQL connection string (DSN).

required
table_name str

Target table for simulation records.

'simulation_data'
**kwargs Any

Additional connection pool arguments.

{}
Source code in src/dynamic_des/connectors/egress/postgres.py
def __init__(
    self, connection_dsn: str, table_name: str = "simulation_data", **kwargs: Any
):
    """
    Initializes the PostgresEgress with connection details.

    Args:
        connection_dsn: PostgreSQL connection string (DSN).
        table_name: Target table for simulation records.
        **kwargs: Additional connection pool arguments.
    """
    self.dsn = connection_dsn
    self.table_name = table_name
    self.kwargs = kwargs
run(egress_queue) async

Main execution loop for PostgreSQL data persistence.

Should implement a resilient connection using an async driver (like asyncpg), performing bulk inserts of batched data from the egress queue.

Parameters:

Name Type Description Default
egress_queue Queue

A thread-safe queue containing batches of simulation data.

required

Raises:

Type Description
NotImplementedError

This connector is currently a placeholder.

Source code in src/dynamic_des/connectors/egress/postgres.py
async def run(self, egress_queue: queue.Queue) -> None:
    """
    Main execution loop for PostgreSQL data persistence.

    Should implement a resilient connection using an async driver (like asyncpg),
    performing bulk inserts of batched data from the egress queue.

    Args:
        egress_queue: A thread-safe queue containing batches of simulation data.

    Raises:
        NotImplementedError: This connector is currently a placeholder.
    """
    logger.warning("PostgresEgress is not yet implemented.")
    raise NotImplementedError("PostgresEgress.run() is not implemented.")

RedisEgress

Bases: BaseEgress

Asynchronous egress provider for streaming simulation data to Redis.

Designed for real-time dashboarding or inter-process communication, publishing simulation updates to Redis Pub/Sub channels or Streams.

Source code in src/dynamic_des/connectors/egress/redis.py
class RedisEgress(BaseEgress):
    """
    Asynchronous egress provider for streaming simulation data to Redis.

    Designed for real-time dashboarding or inter-process communication,
    publishing simulation updates to Redis Pub/Sub channels or Streams.
    """

    def __init__(
        self,
        host: str = "localhost",
        port: int = 6379,
        channel: str = "sim_stream",
        **kwargs: Any,
    ):
        """
        Initializes the RedisEgress with connection and routing details.

        Args:
            host: Redis server hostname.
            port: Redis server port.
            channel: The target Pub/Sub channel or Stream key.
            **kwargs: Additional redis-py configuration.
        """
        self.host = host
        self.port = port
        self.channel = channel
        self.kwargs = kwargs

    async def run(self, egress_queue: queue.Queue) -> None:
        """
        Main execution loop for Redis data streaming.

        Should implement an async client (like redis-py's async mode) to
        publish or add entries to Redis based on the simulation egress queue.

        Args:
            egress_queue: A thread-safe queue containing batches of simulation data.

        Raises:
            NotImplementedError: This connector is currently a placeholder.
        """
        logger.warning("RedisEgress is not yet implemented.")
        raise NotImplementedError("RedisEgress.run() is not implemented.")

Functions

__init__(host='localhost', port=6379, channel='sim_stream', **kwargs)

Initializes the RedisEgress with connection and routing details.

Parameters:

Name Type Description Default
host str

Redis server hostname.

'localhost'
port int

Redis server port.

6379
channel str

The target Pub/Sub channel or Stream key.

'sim_stream'
**kwargs Any

Additional redis-py configuration.

{}
Source code in src/dynamic_des/connectors/egress/redis.py
def __init__(
    self,
    host: str = "localhost",
    port: int = 6379,
    channel: str = "sim_stream",
    **kwargs: Any,
):
    """
    Initializes the RedisEgress with connection and routing details.

    Args:
        host: Redis server hostname.
        port: Redis server port.
        channel: The target Pub/Sub channel or Stream key.
        **kwargs: Additional redis-py configuration.
    """
    self.host = host
    self.port = port
    self.channel = channel
    self.kwargs = kwargs
run(egress_queue) async

Main execution loop for Redis data streaming.

Should implement an async client (like redis-py's async mode) to publish or add entries to Redis based on the simulation egress queue.

Parameters:

Name Type Description Default
egress_queue Queue

A thread-safe queue containing batches of simulation data.

required

Raises:

Type Description
NotImplementedError

This connector is currently a placeholder.

Source code in src/dynamic_des/connectors/egress/redis.py
async def run(self, egress_queue: queue.Queue) -> None:
    """
    Main execution loop for Redis data streaming.

    Should implement an async client (like redis-py's async mode) to
    publish or add entries to Redis based on the simulation egress queue.

    Args:
        egress_queue: A thread-safe queue containing batches of simulation data.

    Raises:
        NotImplementedError: This connector is currently a placeholder.
    """
    logger.warning("RedisEgress is not yet implemented.")
    raise NotImplementedError("RedisEgress.run() is not implemented.")