API Reference
Environment
DynamicRealtimeEnvironment
Bases: RealtimeEnvironment, RegistryMixIn, IngressMixIn, EgressMixIn
The core simulation engine for dynamic-des.
This environment extends SimPy's RealtimeEnvironment by incorporating
a centralized SimulationRegistry for dynamic state updates, and MixIns
for managing high-throughput asynchronous I/O with external systems.
Attributes:
| Name | Type | Description |
|---|---|---|
start_datetime |
datetime
|
The real-world clock time when the simulation started. |
Source code in src/dynamic_des/core/environment.py
Registry & Parameters
SimulationRegistry
A centralized 'Switchboard' that maps dot-notation paths to dynamic simulation parameters.
The Registry acts as the single source of truth for the simulation's state. It allows external streams (like Kafka or Redis) to update parameters on the fly, seamlessly synchronizing them with the underlying SimPy processes.
Attributes:
| Name | Type | Description |
|---|---|---|
env |
Environment
|
The active SimPy environment. |
Source code in src/dynamic_des/core/registry.py
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | |
Functions
get(path)
Retrieve the DynamicValue object at a specific path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The dot-notation path (e.g., 'Line_A.arrival.standard.rate'). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DynamicValue |
DynamicValue
|
The wrapper object for the requested parameter. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the path does not exist in the registry. |
Source code in src/dynamic_des/core/registry.py
get_config(path)
Retrieve the live configuration object (e.g., DistributionConfig).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The dot-notation path (e.g., 'Line_A.service.milling'). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The synchronized configuration object. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the config path does not exist in the registry. |
Source code in src/dynamic_des/core/registry.py
register_sim_parameter(param)
Takes a SimParameter instance and flattens it into the registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
param
|
SimParameter
|
The initial state schema to register. |
required |
Source code in src/dynamic_des/core/registry.py
update(path, new_value)
Update a value safely and synchronize its parent configuration object.
Includes dynamic type casting to protect the simulation from crashing if external systems send improperly typed data (e.g., sending the string "5" instead of the integer 5).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The dot-notation path to update. |
required |
new_value
|
Any
|
The new value to apply. |
required |
Source code in src/dynamic_des/core/registry.py
SimParameter
dataclass
The master schema representing the state of a specific simulation entity (like a production line).
This object is registered with the SimulationRegistry, which flattens
the nested dictionaries into dot-notation paths (e.g., 'Line_A.arrival.standard.rate').
Attributes:
| Name | Type | Description |
|---|---|---|
sim_id |
str
|
The unique prefix for this group of parameters (e.g., 'Line_A'). |
arrival |
Dict[str, DistributionConfig]
|
Configurations for arrival generation rates. |
service |
Dict[str, DistributionConfig]
|
Configurations for process task durations. |
resources |
Dict[str, CapacityConfig]
|
Configurations for standard SimPy Resources. |
containers |
Dict[str, CapacityConfig]
|
Configurations for continuous SimPy Containers. |
stores |
Dict[str, CapacityConfig]
|
Configurations for discrete SimPy Stores. |
Source code in src/dynamic_des/models/params.py
CapacityConfig
dataclass
Configuration for the capacity of a simulated resource, container, or store.
Attributes:
| Name | Type | Description |
|---|---|---|
current_cap |
int
|
The currently active capacity (e.g., number of active workers). |
max_cap |
int
|
The absolute physical maximum capacity. |
Source code in src/dynamic_des/models/params.py
DistributionConfig
dataclass
Configuration for a statistical distribution used in the simulation.
This config is dynamically updatable. For example, you can change the rate
of an 'exponential' distribution via Kafka, and the Sampler will instantly
use the new rate for the next event.
Attributes:
| Name | Type | Description |
|---|---|---|
dist |
str
|
The type of distribution (e.g., 'exponential', 'normal', 'uniform'). |
rate |
Optional[float]
|
The rate parameter (lambda) for exponential distributions. |
mean |
Optional[float]
|
The mean (mu) for normal distributions. |
std |
Optional[float]
|
The standard deviation (sigma) for normal distributions. |
Source code in src/dynamic_des/models/params.py
Data Payloads
TelemetryPayload
Bases: BaseStreamPayload
Schema for low-volume, single-metric telemetry updates.
This payload is used for publishing continuous system state variables—such as resource utilization, queue lengths, or system lag—typically used for real-time dashboards.
Attributes:
| Name | Type | Description |
|---|---|---|
stream_type |
Literal['telemetry']
|
Hardcoded to "telemetry" for downstream routing. |
path_id |
str
|
The dot-notation path of the metric (e.g., 'Line_A.lathe.utilization'). |
value |
Any
|
The scalar value of the metric at the given simulation time. |
Source code in src/dynamic_des/models/schemas.py
EventPayload
Bases: BaseStreamPayload
Schema for high-volume discrete simulation events.
This payload tracks state transitions of specific entities (like tasks or parts) throughout
the simulation lifecycle. The key attribute allows message brokers like Kafka to maintain
strict chronological ordering for specific tasks.
Attributes:
| Name | Type | Description |
|---|---|---|
stream_type |
Literal['event']
|
Hardcoded to "event" for downstream routing. |
key |
str
|
A unique identifier/partition key for the event (e.g., 'task-001'). |
value |
Dict[str, Any]
|
A dictionary containing the event's detailed payload. |
Source code in src/dynamic_des/models/schemas.py
BaseStreamPayload
Bases: BaseModel
Base schema for all egress data emitted by the simulation engine.
This schema guarantees that all outgoing data streams share a common temporal context, allowing downstream systems to accurately synchronize simulation time with real-world time.
Attributes:
| Name | Type | Description |
|---|---|---|
stream_type |
Literal['telemetry', 'event']
|
An identifier indicating the nature of the payload. |
sim_ts |
float
|
The simulation clock time (in seconds) when the payload was generated. |
timestamp |
str
|
The real-world ISO 8601 timestamp indicating when the payload was generated. |
Source code in src/dynamic_des/models/schemas.py
Utilities
Sampler
Generates random numbers based on live DistributionConfig objects.
The Sampler evaluates the configuration at the exact moment it is called. This allows the simulation to dynamically respond to external parameter changes.
Attributes:
| Name | Type | Description |
|---|---|---|
rng |
Generator
|
The NumPy random number generator instance. |
Source code in src/dynamic_des/core/sampler.py
Functions
sample(config, min_delay=1e-05)
Draws a random sample using the current parameters in the provided config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
DistributionConfig
|
The live configuration object retrieved from the Registry. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
float |
float
|
A sampled float representing a time duration (e.g., arrival gap or service time). Returns 0.0 if the resulting sample is negative. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the |
Source code in src/dynamic_des/core/sampler.py
Resources
DynamicResource
Bases: BaseDynamicResource
A SimPy Resource with a dynamically adjustable capacity.
Unlike a standard simpy.Resource where capacity is fixed at creation,
the DynamicResource listens to a SimulationRegistry path (e.g., 'Line_A.lathe.current_cap').
When the registry updates the capacity, the resource automatically adjusts,
triggering pending requests if capacity increases.
Attributes:
| Name | Type | Description |
|---|---|---|
env |
DynamicRealtimeEnvironment
|
The active simulation environment. |
sim_id |
str
|
The prefix ID (e.g., 'Line_A'). |
resource_id |
str
|
The specific resource name (e.g., 'lathe'). |
Source code in src/dynamic_des/resources/resource.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | |
Attributes
in_use
property
Currently occupied slots (Total Capacity - Idle Tokens).
Functions
request(priority=1)
Admin & Infrastructure
KafkaAdminConnector
Unified Kafka Admin and Monitoring Connector.
This connector acts as a management layer for the simulation's Kafka
infrastructure. It handles synchronous administrative tasks (topic creation)
using kafka-python and asynchronous data operations (sending config,
collecting telemetry/events) using aiokafka.
It maintains an internal state of simulation vitals and task lifecycles by consuming from the simulation's egress topics.
Attributes:
| Name | Type | Description |
|---|---|---|
bootstrap_servers |
str
|
Kafka broker addresses. |
max_tasks |
int
|
The maximum number of task records to keep per service in memory to prevent unbounded growth. |
kwargs |
dict
|
Additional arguments passed to Kafka clients. |
Source code in src/dynamic_des/connectors/admin/kafka.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | |
Functions
create_topics(topics_config)
Creates Kafka topics required for the simulation.
This is a synchronous call to ensure that all necessary infrastructure (config, telemetry, and event topics) is ready before the simulation environment starts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics_config
|
List[Dict[str, Any]]
|
A list of topic configurations. Each dict should contain 'name', and optionally 'partitions' and 'replication'. |
required |
Source code in src/dynamic_des/connectors/admin/kafka.py
send_config(topic, path_id, value)
async
Sends a surgical parameter update to a simulation config topic.
This method acts as an external controller, allowing users to dynamically update registry paths (e.g., arrival rates or resource capacities) while the simulation is running.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic the simulation is listening to for config. |
required |
path_id
|
str
|
The registry dot-notation path (e.g., 'Line_A.lathe.max_cap'). |
required |
value
|
Any
|
The new value to be applied to the path. |
required |
Source code in src/dynamic_des/connectors/admin/kafka.py
collect_data(topics, auto_offset_reset='latest')
async
Async loop to consume from telemetry and event topics.
This loop continuously listens to the simulation's egress and updates
the connector's internal _vitals and _state attributes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
List[str]
|
List of topics to consume from (telemetry and events). |
required |
auto_offset_reset
|
str
|
Where to start consuming if no offset is committed. |
'latest'
|
Source code in src/dynamic_des/connectors/admin/kafka.py
get_state()
Returns the aggregated event state for task lifecycles.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A nested dictionary: sim_id -> service -> task_id -> {status: timestamp}. |
get_vitals()
Returns the latest system telemetry metrics.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary of path_id to latest value (e.g., utilization, queue length). |
Connectors (Ingress)
BaseIngress
Base class for all ingress providers in the simulation.
Ingress providers act as asynchronous listeners that bridge external data sources (such as Kafka topics, Redis channels, or local schedules) with the simulation's internal state. They are responsible for fetching updates and placing them into a thread-safe queue for the registry to process.
Source code in src/dynamic_des/connectors/ingress/base.py
Functions
run(ingress_queue)
async
Listens to an external source and pushes updates to the ingress queue.
This method should be implemented as an asynchronous loop that waits for external signals or data and converts them into (path, value) tuples suitable for the SimulationRegistry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit updates to the main simulation environment. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the subclass does not override this method. |
Source code in src/dynamic_des/connectors/ingress/base.py
KafkaIngress
Bases: BaseIngress
Resilient Kafka consumer for dynamic simulation configuration updates.
This connector subscribes to a specified Kafka topic and parses incoming JSON messages into state updates for the SimulationRegistry. It is designed to handle real-time control signals that modify simulation parameters (e.g., changing a machine's capacity or an arrival rate) while the model is executing.
The expected message format is a JSON object containing a 'path_id' string and a 'value' of any serializable type.
Attributes:
| Name | Type | Description |
|---|---|---|
topic |
str
|
The Kafka topic name used for configuration signals. |
bootstrap_servers |
str
|
Comma-separated string of Kafka broker addresses. |
kwargs |
dict
|
Additional configuration parameters for the AIOKafkaConsumer. |
Source code in src/dynamic_des/connectors/ingress/kafka.py
Functions
__init__(topic, bootstrap_servers, **kwargs)
Initializes the KafkaIngress with connection and subscription details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic to subscribe to. |
required |
bootstrap_servers
|
str
|
Kafka broker address or list of addresses. |
required |
**kwargs
|
Any
|
Arbitrary keyword arguments passed to the underlying AIOKafkaConsumer (e.g., group_id, security_protocol). |
{}
|
Source code in src/dynamic_des/connectors/ingress/kafka.py
run(ingress_queue)
async
Main execution loop that consumes Kafka messages and populates the ingress queue.
This method maintains a persistent connection to the Kafka broker. It features an exponential backoff strategy to handle network interruptions and connection failures gracefully.
Decoded messages are validated for basic structure; malformed JSON or missing keys are logged as warnings without halting the consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue where (path_id, value) tuples are placed for processing by the SimulationRegistry. |
required |
Note
The loop responds to asyncio.CancelledError for clean shutdown,
ensuring the Kafka consumer stops and releases resources properly.
Source code in src/dynamic_des/connectors/ingress/kafka.py
LocalIngress
Bases: BaseIngress
Ingress provider for deterministic, time-scheduled parameter updates.
This connector is primarily used for testing, benchmarking, or local simulation runs where parameter changes need to occur at specific wall-clock intervals without requiring an external message broker like Kafka. It feeds a pre-defined sequence of updates into the simulation registry based on relative delays.
Attributes:
| Name | Type | Description |
|---|---|---|
schedule |
List[Tuple[float, str, Any]]
|
A chronologically sorted list of updates, where each tuple contains (delay_seconds, path_id, value). |
Source code in src/dynamic_des/connectors/ingress/local.py
Functions
__init__(schedule)
Initializes the LocalIngress and sorts the provided schedule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schedule
|
List[Tuple[float, str, Any]]
|
A list of tuples representing scheduled updates. Format: [(delay_from_start, "registry_path", new_value), ...]. The list is automatically sorted by the delay time. |
required |
Source code in src/dynamic_des/connectors/ingress/local.py
run(ingress_queue)
async
Processes the schedule and pushes updates to the queue at the correct wall-clock times.
This method calculates the relative sleep intervals between
scheduled events to ensure that updates are placed in the
ingress queue precisely when requested. Because it uses
asyncio.sleep, it does not block the main simulation execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit the (path_id, value) updates to the SimulationRegistry. |
required |
Source code in src/dynamic_des/connectors/ingress/local.py
PostgresIngress
Bases: BaseIngress
Asynchronous ingress provider for polling updates from a PostgreSQL database.
This connector is intended to act as a bridge for scenarios where simulation parameters are managed within a relational database. It would typically poll a configuration table for changes or listen to database notifications (LISTEN/NOTIFY) to trigger real-time updates in the simulation registry.
Note
This class is currently a placeholder and is planned for a future release.
Source code in src/dynamic_des/connectors/ingress/postgres.py
Functions
__init__(*args, **kwargs)
Initializes the PostgresIngress placeholder.
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Always raised as the connector is not yet implemented. |
Source code in src/dynamic_des/connectors/ingress/postgres.py
run(ingress_queue)
async
Placeholder for the database polling execution loop.
Future implementations will likely utilize an async driver (e.g., asyncpg) to fetch rows and convert them into (path, value) tuples for the ingress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit updates to the SimulationRegistry. |
required |
Source code in src/dynamic_des/connectors/ingress/postgres.py
RedisIngress
Bases: BaseIngress
Asynchronous ingress provider for consuming updates from Redis.
This connector is designed to facilitate high-speed, low-latency parameter updates by leveraging Redis as a middleware layer. Potential implementations include subscribing to Pub/Sub channels for real-time broadcast signals or monitoring Redis Streams for sequential state updates.
Note
This class is currently a placeholder and is planned for a future release.
Source code in src/dynamic_des/connectors/ingress/redis.py
Functions
__init__(*args, **kwargs)
Initializes the RedisIngress placeholder.
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Always raised as the connector is not yet implemented. |
Source code in src/dynamic_des/connectors/ingress/redis.py
run(ingress_queue)
async
Placeholder for the Redis consumption execution loop.
Future implementations will likely utilize an async Redis client to listen for messages and transform them into (path, value) tuples for the internal ingress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit updates to the SimulationRegistry. |
required |
Source code in src/dynamic_des/connectors/ingress/redis.py
Connectors (Egress)
BaseEgress
Base class for all egress providers in the simulation.
Egress providers act as asynchronous bridges that consume processed simulation data (telemetry and events) from a thread-safe internal queue and transmit it to external destinations such as Kafka, databases, or the console.
Source code in src/dynamic_des/connectors/egress/base.py
Functions
run(egress_queue)
async
Listens to the internal queue and pushes data to an external sink.
This method should contain an asynchronous loop that polls the provided queue and handles the networking/I/O logic specific to the destination system.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing batches of dictionaries to be exported. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the subclass does not override this method. |
Source code in src/dynamic_des/connectors/egress/base.py
KafkaEgress
Bases: BaseEgress
High-throughput Kafka producer for simulation telemetry and events.
This connector utilizes aiokafka for asynchronous I/O and orjson for fast
serialization. It implements a resilient connection loop with exponential
backoff and handles two distinct data streams:
1. Telemetry: Typically mapped to single-partition topics for system metrics.
2. Events: Mapped to partitioned topics using event keys for ordering guarantees.
Attributes:
| Name | Type | Description |
|---|---|---|
telemetry_topic |
str
|
The Kafka topic name for telemetry data. |
event_topic |
str
|
The Kafka topic name for lifecycle events. |
producer_config |
dict
|
Configuration dictionary passed to AIOKafkaProducer, including performance optimizations like lz4 compression and batch lingering. |
Source code in src/dynamic_des/connectors/egress/kafka.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | |
Functions
__init__(telemetry_topic, event_topic, bootstrap_servers, **kwargs)
Initializes the KafkaEgress with topic and connection settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
telemetry_topic
|
str
|
Destination topic for telemetry stream. |
required |
event_topic
|
str
|
Destination topic for event stream. |
required |
bootstrap_servers
|
str
|
Comma-separated list of Kafka brokers. |
required |
**kwargs
|
Any
|
Additional overrides for the AIOKafkaProducer configuration. |
{}
|
Source code in src/dynamic_des/connectors/egress/kafka.py
run(egress_queue)
async
The main execution loop that consumes the egress queue and publishes to Kafka.
This method maintains a persistent connection to Kafka. If the connection
is lost, it implements an exponential backoff retry strategy. It polls
the internal egress_queue for batches of data, determines the target
topic based on the 'stream_type', and performs asynchronous sends.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing lists of dictionaries (batches) generated by the EgressMixIn. |
required |
Note
The loop exits gracefully upon receiving an asyncio.CancelledError,
ensuring the Kafka producer is stopped correctly.
Source code in src/dynamic_des/connectors/egress/kafka.py
ConsoleEgress
Bases: BaseEgress
Local egress provider that outputs simulation data to the standard console.
This connector serves as a debugging and local development tool. It processes batched data from the simulation and prints formatted entries to stdout, distinguishing between telemetry metrics and discrete events via prefixes.
Source code in src/dynamic_des/connectors/egress/local.py
Functions
run(egress_queue)
async
Continuously polls the egress queue and prints formatted results.
This method implements an asynchronous polling loop. It extracts batches from the internal queue, identifies the data stream type, and formats the output with specific prefixes: - [TEL]: Telemetry data (e.g., resource utilization, queue lengths). - [EVT]: Event data (e.g., task lifecycle transitions).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing lists of dictionaries (batches) generated by the EgressMixIn. |
required |
Note
If the queue is empty, the method yields control to the asyncio event loop for a short duration to prevent CPU pinning.
Source code in src/dynamic_des/connectors/egress/local.py
PostgresEgress
Bases: BaseEgress
Asynchronous egress provider for persisting simulation data to PostgreSQL.
This connector is intended to handle long-term storage of simulation results, allowing for historical analysis and complex SQL queries on telemetry and event data.
Source code in src/dynamic_des/connectors/egress/postgres.py
Functions
__init__(connection_dsn, table_name='simulation_data', **kwargs)
Initializes the PostgresEgress with connection details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_dsn
|
str
|
PostgreSQL connection string (DSN). |
required |
table_name
|
str
|
Target table for simulation records. |
'simulation_data'
|
**kwargs
|
Any
|
Additional connection pool arguments. |
{}
|
Source code in src/dynamic_des/connectors/egress/postgres.py
run(egress_queue)
async
Main execution loop for PostgreSQL data persistence.
Should implement a resilient connection using an async driver (like asyncpg), performing bulk inserts of batched data from the egress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing batches of simulation data. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
This connector is currently a placeholder. |
Source code in src/dynamic_des/connectors/egress/postgres.py
RedisEgress
Bases: BaseEgress
Asynchronous egress provider for streaming simulation data to Redis.
Designed for real-time dashboarding or inter-process communication, publishing simulation updates to Redis Pub/Sub channels or Streams.
Source code in src/dynamic_des/connectors/egress/redis.py
Functions
__init__(host='localhost', port=6379, channel='sim_stream', **kwargs)
Initializes the RedisEgress with connection and routing details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Redis server hostname. |
'localhost'
|
port
|
int
|
Redis server port. |
6379
|
channel
|
str
|
The target Pub/Sub channel or Stream key. |
'sim_stream'
|
**kwargs
|
Any
|
Additional redis-py configuration. |
{}
|
Source code in src/dynamic_des/connectors/egress/redis.py
run(egress_queue)
async
Main execution loop for Redis data streaming.
Should implement an async client (like redis-py's async mode) to publish or add entries to Redis based on the simulation egress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing batches of simulation data. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
This connector is currently a placeholder. |