API Reference
Environment
DynamicRealtimeEnvironment
Bases: RealtimeEnvironment, RegistryMixIn, IngressMixIn, EgressMixIn
The core simulation engine for dynamic-des.
This environment extends SimPy's RealtimeEnvironment by incorporating
a centralized SimulationRegistry for dynamic state updates, and MixIns
for managing high-throughput asynchronous I/O with external systems.
Attributes:
| Name | Type | Description |
|---|---|---|
start_datetime |
datetime
|
The real-world clock time when the simulation started. |
Source code in src/dynamic_des/core/environment.py
Registry & Parameters
SimulationRegistry
A centralized 'Switchboard' that maps dot-notation paths to dynamic simulation parameters.
The Registry acts as the single source of truth for the simulation's state. It allows external streams (like Kafka or Redis) to update parameters on the fly, seamlessly synchronizing them with the underlying SimPy processes.
Attributes:
| Name | Type | Description |
|---|---|---|
env |
Environment
|
The active SimPy environment. |
Source code in src/dynamic_des/core/registry.py
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 | |
Functions
get(path)
Retrieve the DynamicValue object at a specific path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The dot-notation path (e.g., 'Line_A.arrival.standard.rate'). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
DynamicValue |
DynamicValue
|
The wrapper object for the requested parameter. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the path does not exist in the registry. |
Source code in src/dynamic_des/core/registry.py
get_config(path)
Retrieve the live configuration object (e.g., DistributionConfig).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The dot-notation path (e.g., 'Line_A.service.milling'). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The synchronized configuration object. |
Raises:
| Type | Description |
|---|---|
KeyError
|
If the config path does not exist in the registry. |
Source code in src/dynamic_des/core/registry.py
register_sim_parameter(param)
Takes a SimParameter instance and flattens it into the registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
param
|
SimParameter
|
The initial state schema to register. |
required |
Source code in src/dynamic_des/core/registry.py
update(path, new_value)
Update a value safely and synchronize its parent configuration object.
Includes dynamic type casting to protect the simulation from crashing if external systems send improperly typed data (e.g., sending the string "5" instead of the integer 5).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
The dot-notation path to update. |
required |
new_value
|
Any
|
The new value to apply. |
required |
Source code in src/dynamic_des/core/registry.py
SimParameter
dataclass
The master schema representing the state of a specific simulation entity (like a production line).
This object is registered with the SimulationRegistry, which flattens
the nested dictionaries into dot-notation paths (e.g., 'Line_A.arrival.standard.rate').
Attributes:
| Name | Type | Description |
|---|---|---|
sim_id |
str
|
The unique prefix for this group of parameters (e.g., 'Line_A'). |
arrival |
Dict[str, DistributionConfig]
|
Configurations for arrival generation rates. |
service |
Dict[str, DistributionConfig]
|
Configurations for process task durations. |
resources |
Dict[str, CapacityConfig]
|
Configurations for standard SimPy Resources. |
containers |
Dict[str, CapacityConfig]
|
Configurations for continuous SimPy Containers. |
stores |
Dict[str, CapacityConfig]
|
Configurations for discrete SimPy Stores. |
variables |
Dict[str, Any]
|
Flexible user-defined state variables (e.g., int, float, bool, str) used to drive custom logic, track string-based states, or act as external control dials. |
Source code in src/dynamic_des/models/params.py
CapacityConfig
dataclass
Configuration for the capacity of a simulated resource, container, or store.
Attributes:
| Name | Type | Description |
|---|---|---|
current_cap |
Union[int, float]
|
The currently active capacity. |
max_cap |
Union[int, float]
|
The absolute physical maximum capacity. |
Source code in src/dynamic_des/models/params.py
DistributionConfig
dataclass
Configuration for a statistical distribution used in the simulation.
This config is dynamically updatable. For example, you can change the rate
of an 'exponential' distribution via Kafka, and the Sampler will instantly
use the new rate for the next event.
Attributes:
| Name | Type | Description |
|---|---|---|
dist |
str
|
The type of distribution (e.g., 'exponential', 'normal', 'uniform'). |
rate |
Optional[float]
|
The rate parameter (lambda) for exponential distributions. |
mean |
Optional[float]
|
The mean (mu) for normal distributions. |
std |
Optional[float]
|
The standard deviation (sigma) for normal distributions. |
Source code in src/dynamic_des/models/params.py
Data Payloads
TelemetryPayload
Bases: BaseStreamPayload
Schema for low-volume, single-metric telemetry updates.
This payload is used for publishing continuous system state variables—such as resource utilization, queue lengths, or system lag—typically used for real-time dashboards.
Attributes:
| Name | Type | Description |
|---|---|---|
stream_type |
Literal['telemetry']
|
Hardcoded to "telemetry" for downstream routing. |
path_id |
str
|
The dot-notation path of the metric (e.g., 'Line_A.lathe.utilization'). |
value |
Any
|
The scalar value of the metric at the given simulation time. |
Source code in src/dynamic_des/models/schemas.py
EventPayload
Bases: BaseStreamPayload
Schema for high-volume discrete simulation events.
This payload tracks state transitions of specific entities (like tasks or parts) throughout
the simulation lifecycle. The key attribute allows message brokers like Kafka to maintain
strict chronological ordering for specific tasks.
Attributes:
| Name | Type | Description |
|---|---|---|
stream_type |
Literal['event']
|
Hardcoded to "event" for downstream routing. |
key |
str
|
A unique identifier/partition key for the event (e.g., 'task-001'). |
value |
Dict[str, Any]
|
A dictionary containing the event's detailed payload. |
Source code in src/dynamic_des/models/schemas.py
BaseStreamPayload
Bases: BaseModel
Base schema for all egress data emitted by the simulation engine.
This schema guarantees that all outgoing data streams share a common temporal context, allowing downstream systems to accurately synchronize simulation time with real-world time.
Attributes:
| Name | Type | Description |
|---|---|---|
stream_type |
Literal['telemetry', 'event']
|
An identifier indicating the nature of the payload. |
sim_ts |
float
|
The simulation clock time (in seconds) when the payload was generated. |
timestamp |
str
|
The real-world ISO 8601 timestamp indicating when the payload was generated. |
Source code in src/dynamic_des/models/schemas.py
Utilities
Sampler
Generates random numbers based on live DistributionConfig objects.
The Sampler evaluates the configuration at the exact moment it is called. This allows the simulation to dynamically respond to external parameter changes.
Attributes:
| Name | Type | Description |
|---|---|---|
rng |
Generator
|
The NumPy random number generator instance. |
Source code in src/dynamic_des/core/sampler.py
Functions
sample(config, min_delay=1e-05)
Draws a random sample using the current parameters in the provided config.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
DistributionConfig
|
The live configuration object retrieved from the Registry. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
float |
float
|
A sampled float representing a time duration (e.g., arrival gap or service time). Returns 0.0 if the resulting sample is negative. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the |
Source code in src/dynamic_des/core/sampler.py
Resources
DynamicResource
Bases: BaseDynamicResource
A SimPy Resource with a dynamically adjustable capacity.
Unlike a standard simpy.Resource where capacity is fixed at creation,
the DynamicResource listens to a SimulationRegistry path (e.g., 'Line_A.lathe.current_cap').
When the registry updates the capacity, the resource automatically adjusts,
triggering pending requests if capacity increases.
Attributes:
| Name | Type | Description |
|---|---|---|
env |
DynamicRealtimeEnvironment
|
The active simulation environment. |
sim_id |
str
|
The prefix ID (e.g., 'Line_A'). |
resource_id |
str
|
The specific resource name (e.g., 'lathe'). |
Source code in src/dynamic_des/resources/resource.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | |
Attributes
in_use
property
Currently occupied slots (Total Capacity - Idle Tokens).
capacity
property
The current active capacity of the resource.
This value reflects the live state from the Registry and may change during the simulation.
Functions
request(priority=1)
DynamicContainer
Bases: BaseDynamicResource
A wrapper for SimPy Container with dynamic capacity updates.
Useful for modeling continuous bulk materials (fluids, gases, battery charge).
Unlike a standard simpy.Container, the capacity of this container listens
to a SimulationRegistry path and can be mutated at runtime.
Capacity Shrinkage Behavior:
If the capacity is dynamically shrunk below the current level of the container,
the material is NOT destroyed. The container will temporarily exist in an
"overflow" state. All pending and future put requests will be strictly
blocked until downstream processes get enough material to drain the level
back below the new, smaller capacity.
Attributes:
| Name | Type | Description |
|---|---|---|
env |
Environment
|
The active SimPy simulation environment. |
sim_id |
str
|
The parent simulation ID prefix. |
obj_id |
str
|
The specific container name/identifier. |
Source code in src/dynamic_des/resources/container.py
Attributes
level
property
float: The current amount of bulk material stored inside the container.
capacity
property
float: The current active maximum capacity of the container. This value reflects the live state from the Registry.
Functions
put(amount)
Request to put a specific amount of material into the container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
amount
|
float
|
The amount of material to add. |
required |
Returns:
| Type | Description |
|---|---|
Event
|
simpy.events.Event: A SimPy event that triggers when capacity is available. |
Source code in src/dynamic_des/resources/container.py
get(amount)
Request to get a specific amount of material from the container.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
amount
|
float
|
The amount of material to withdraw. |
required |
Returns:
| Type | Description |
|---|---|
ContainerGet
|
simpy.resources.container.ContainerGet: A SimPy event that triggers when enough material is available. |
Source code in src/dynamic_des/resources/container.py
DynamicStore
Bases: BaseDynamicResource
A wrapper for SimPy Store and PriorityStore with dynamic capacity updates.
Useful for modeling buffers, queues, or staging areas that hold discrete,
heterogeneous items. By setting priority=True, the store will automatically
sort incoming items (which must be sortable or wrapped in simpy.PriorityItem)
so high-priority items are retrieved first.
Capacity Shrinkage Behavior:
If the capacity is dynamically shrunk below the current number of items
in the store, the existing items are NOT destroyed. The store will temporarily
exist in an "over-capacity" state. All pending and future put requests
will be blocked until downstream processes get enough items to bring the
total item count below the new capacity.
Attributes:
| Name | Type | Description |
|---|---|---|
env |
Environment
|
The active SimPy simulation environment. |
sim_id |
str
|
The parent simulation ID prefix. |
obj_id |
str
|
The specific store name/identifier. |
Source code in src/dynamic_des/resources/store.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | |
Attributes
items
property
list: The actual list of distinct item objects currently residing in the store.
capacity
property
int: The current active maximum slot capacity of the store. This value reflects the live state from the Registry.
Functions
put(item)
Request to put a specific distinct item into the store.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
Any
|
The Python object to place into the store. |
required |
Returns:
| Type | Description |
|---|---|
Event
|
simpy.events.Event: A SimPy event that triggers when a slot is available. |
Source code in src/dynamic_des/resources/store.py
get()
Request to get the next available item from the store.
Returns:
| Type | Description |
|---|---|
StoreGet
|
simpy.resources.store.StoreGet: A SimPy event that yields the requested item object when available. |
Admin & Infrastructure
KafkaAdminConnector
Unified Kafka Admin and Monitoring Connector.
This connector acts as a management layer for the simulation's Kafka
infrastructure. It handles synchronous administrative tasks (topic creation)
using kafka-python and asynchronous data operations (sending config,
collecting telemetry/events) using aiokafka.
It maintains an internal state of simulation vitals and task lifecycles by consuming from the simulation's egress topics.
Attributes:
| Name | Type | Description |
|---|---|---|
bootstrap_servers |
str
|
Kafka broker addresses. |
max_tasks |
int
|
The maximum number of task records to keep per service in memory to prevent unbounded growth. |
kwargs |
dict
|
Additional arguments passed to Kafka clients. |
Source code in src/dynamic_des/connectors/admin/kafka.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | |
Functions
create_topics(topics_config)
Creates Kafka topics required for the simulation.
This is a synchronous call to ensure that all necessary infrastructure (config, telemetry, and event topics) is ready before the simulation environment starts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics_config
|
List[Dict[str, Any]]
|
A list of topic configurations. Each dict should contain 'name', and optionally 'partitions' and 'replication'. |
required |
Source code in src/dynamic_des/connectors/admin/kafka.py
send_config(topic, path_id, value)
async
Sends a surgical parameter update to a simulation config topic.
This method acts as an external controller, allowing users to dynamically update registry paths (e.g., arrival rates or resource capacities) while the simulation is running.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic the simulation is listening to for config. |
required |
path_id
|
str
|
The registry dot-notation path (e.g., 'Line_A.lathe.max_cap'). |
required |
value
|
Any
|
The new value to be applied to the path. |
required |
Source code in src/dynamic_des/connectors/admin/kafka.py
collect_data(topics, auto_offset_reset='latest')
async
Async loop to consume from telemetry and event topics.
This loop continuously listens to the simulation's egress and updates
the connector's internal _vitals and _state attributes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
List[str]
|
List of topics to consume from (telemetry and events). |
required |
auto_offset_reset
|
str
|
Where to start consuming if no offset is committed. |
'latest'
|
Source code in src/dynamic_des/connectors/admin/kafka.py
get_state()
Returns the aggregated event state for task lifecycles.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A nested dictionary: sim_id -> service -> task_id -> {status: timestamp}. |
get_vitals()
Returns the latest system telemetry metrics.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A dictionary of path_id to latest value (e.g., utilization, queue length). |
Connectors (Ingress)
BaseIngress
Base class for all ingress providers in the simulation.
Ingress providers act as asynchronous listeners that bridge external data sources (such as Kafka topics, Redis channels, or local schedules) with the simulation's internal state. They are responsible for fetching updates and placing them into a thread-safe queue for the registry to process.
Source code in src/dynamic_des/connectors/ingress/base.py
Functions
run(ingress_queue)
async
Listens to an external source and pushes updates to the ingress queue.
This method should be implemented as an asynchronous loop that waits for external signals or data and converts them into (path, value) tuples suitable for the SimulationRegistry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit updates to the main simulation environment. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the subclass does not override this method. |
Source code in src/dynamic_des/connectors/ingress/base.py
KafkaIngress
Bases: BaseIngress
Resilient Kafka consumer for dynamic simulation configuration updates.
This connector subscribes to a specified Kafka topic and decodes incoming messages into state updates for the SimulationRegistry. It supports pluggable deserialization (Avro/JSON) while remaining 100% backward compatible.
The expected resulting dictionary format must contain a 'path_id' string and a 'value' of any serializable type.
Attributes:
| Name | Type | Description |
|---|---|---|
topic |
str
|
The Kafka topic name used for configuration signals. |
bootstrap_servers |
str
|
Comma-separated string of Kafka broker addresses. |
topic_deserializers |
Optional[Dict[str, MessageDeserializer]]
|
Mapping of topics to specific deserializers. |
default_deserializer |
Optional[MessageDeserializer]
|
Fallback deserializer. Defaults to |
kwargs |
dict
|
Additional configuration parameters for the AIOKafkaConsumer. |
Source code in src/dynamic_des/connectors/ingress/kafka.py
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 | |
Functions
__init__(topic, bootstrap_servers, topic_deserializers=None, default_deserializer=None, **kwargs)
Initializes the KafkaIngress with connection and subscription details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic to subscribe to. |
required |
bootstrap_servers
|
str
|
Kafka broker address or list of addresses. |
required |
topic_deserializers
|
Optional[Dict[str, MessageDeserializer]]
|
Mapping of topics to their specific deserializers. |
None
|
default_deserializer
|
Optional[MessageDeserializer]
|
Fallback deserializer if the topic isn't explicitly mapped. |
None
|
**kwargs
|
Any
|
Arbitrary keyword arguments passed to the underlying AIOKafkaConsumer. |
{}
|
Source code in src/dynamic_des/connectors/ingress/kafka.py
run(ingress_queue)
async
Main execution loop that consumes Kafka messages, deserializes them, and populates the ingress queue.
Source code in src/dynamic_des/connectors/ingress/kafka.py
LocalIngress
Bases: BaseIngress
Ingress provider for deterministic, time-scheduled parameter updates.
This connector is primarily used for testing, benchmarking, or local simulation runs where parameter changes need to occur at specific wall-clock intervals without requiring an external message broker like Kafka. It feeds a pre-defined sequence of updates into the simulation registry based on relative delays.
Attributes:
| Name | Type | Description |
|---|---|---|
schedule |
List[Tuple[float, str, Any]]
|
A chronologically sorted list of updates, where each tuple contains (delay_seconds, path_id, value). |
Source code in src/dynamic_des/connectors/ingress/local.py
Functions
__init__(schedule)
Initializes the LocalIngress and sorts the provided schedule.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schedule
|
List[Tuple[float, str, Any]]
|
A list of tuples representing scheduled updates. Format: [(delay_from_start, "registry_path", new_value), ...]. The list is automatically sorted by the delay time. |
required |
Source code in src/dynamic_des/connectors/ingress/local.py
run(ingress_queue)
async
Processes the schedule and pushes updates to the queue at the correct wall-clock times.
This method calculates the relative sleep intervals between
scheduled events to ensure that updates are placed in the
ingress queue precisely when requested. Because it uses
asyncio.sleep, it does not block the main simulation execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit the (path_id, value) updates to the SimulationRegistry. |
required |
Source code in src/dynamic_des/connectors/ingress/local.py
PostgresIngress
Bases: BaseIngress
Asynchronous ingress provider for polling updates from a PostgreSQL database.
This connector is intended to act as a bridge for scenarios where simulation parameters are managed within a relational database. It would typically poll a configuration table for changes or listen to database notifications (LISTEN/NOTIFY) to trigger real-time updates in the simulation registry.
Note
This class is currently a placeholder and is planned for a future release.
Source code in src/dynamic_des/connectors/ingress/postgres.py
Functions
__init__(*args, **kwargs)
Initializes the PostgresIngress placeholder.
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Always raised as the connector is not yet implemented. |
Source code in src/dynamic_des/connectors/ingress/postgres.py
run(ingress_queue)
async
Placeholder for the database polling execution loop.
Future implementations will likely utilize an async driver (e.g., asyncpg) to fetch rows and convert them into (path, value) tuples for the ingress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit updates to the SimulationRegistry. |
required |
Source code in src/dynamic_des/connectors/ingress/postgres.py
RedisIngress
Bases: BaseIngress
Asynchronous ingress provider for consuming updates from Redis.
This connector is designed to facilitate high-speed, low-latency parameter updates by leveraging Redis as a middleware layer. Potential implementations include subscribing to Pub/Sub channels for real-time broadcast signals or monitoring Redis Streams for sequential state updates.
Note
This class is currently a placeholder and is planned for a future release.
Source code in src/dynamic_des/connectors/ingress/redis.py
Functions
__init__(*args, **kwargs)
Initializes the RedisIngress placeholder.
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
Always raised as the connector is not yet implemented. |
Source code in src/dynamic_des/connectors/ingress/redis.py
run(ingress_queue)
async
Placeholder for the Redis consumption execution loop.
Future implementations will likely utilize an async Redis client to listen for messages and transform them into (path, value) tuples for the internal ingress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ingress_queue
|
Queue
|
A thread-safe queue used to transmit updates to the SimulationRegistry. |
required |
Source code in src/dynamic_des/connectors/ingress/redis.py
Deserializers
JsonDeserializer
Default fallback deserializer providing backward compatibility via standard JSON.
This deserializer assumes the incoming payload is a UTF-8 encoded JSON string.
Source code in src/dynamic_des/connectors/ingress/kafka.py
Functions
deserialize(topic, payload)
Deserializes a JSON byte string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic from which the message was consumed (unused). |
required |
payload
|
bytes
|
The UTF-8 encoded JSON byte string. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The parsed JSON data as a Python dictionary. |
Source code in src/dynamic_des/connectors/ingress/kafka.py
ConfluentAvroDeserializer
Lazy-loaded deserializer for Confluent Schema Registry.
This class converts Avro-encoded byte strings back into Python dictionaries.
It automatically fetches the appropriate schema from the registry using the
schema ID embedded within the message's binary payload, so no schema_str
is required during initialization.
Source code in src/dynamic_des/connectors/ingress/kafka.py
Functions
__init__(registry_url, **registry_kwargs)
Initializes the Confluent Avro deserializer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry_url
|
str
|
The base URL of the Confluent Schema Registry. |
required |
**registry_kwargs
|
Any
|
Additional configuration arguments passed directly
to the |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If the 'confluent-kafka' package is not installed. |
Source code in src/dynamic_des/connectors/ingress/kafka.py
deserialize(topic, payload)
Deserializes an Avro-encoded byte string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic from which the message was consumed. |
required |
payload
|
bytes
|
The Avro-encoded binary payload containing the schema ID. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The deserialized data as a Python dictionary. |
Source code in src/dynamic_des/connectors/ingress/kafka.py
GlueAvroDeserializer
Lazy-loaded deserializer for AWS Glue Schema Registry.
This class converts Avro-encoded byte strings back into Python dictionaries, integrating directly with AWS Glue Schema Registry. It resolves schemas dynamically based on the metadata embedded in the AWS Glue message payload.
Source code in src/dynamic_des/connectors/ingress/kafka.py
Functions
__init__(registry_name, **boto3_kwargs)
Initializes the AWS Glue Avro deserializer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry_name
|
str
|
The name of the AWS Glue Schema Registry. |
required |
**boto3_kwargs
|
Any
|
Additional arguments passed directly to the |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If the 'aws-glue-schema-registry' or 'boto3' packages are not installed. |
Source code in src/dynamic_des/connectors/ingress/kafka.py
deserialize(topic, payload)
Deserializes an Avro-encoded byte string using AWS Glue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The Kafka topic from which the message was consumed. |
required |
payload
|
bytes
|
The Avro-encoded binary payload. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The raw data dictionary extracted from the AWS DataAndSchema object. |
Source code in src/dynamic_des/connectors/ingress/kafka.py
Connectors (Egress)
BaseEgress
Base class for all egress providers in the simulation.
Egress providers act as asynchronous bridges that consume processed simulation data (telemetry and events) from a thread-safe internal queue and transmit it to external destinations such as Kafka, databases, or the console.
Source code in src/dynamic_des/connectors/egress/base.py
Functions
run(egress_queue)
async
Listens to the internal queue and pushes data to an external sink.
This method should contain an asynchronous loop that polls the provided queue and handles the networking/I/O logic specific to the destination system.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing batches of dictionaries to be exported. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If the subclass does not override this method. |
Source code in src/dynamic_des/connectors/egress/base.py
KafkaEgress
Bases: BaseEgress
High-throughput Kafka producer for simulation telemetry and events.
This connector utilizes aiokafka for asynchronous I/O and orjson for fast
serialization. It implements a resilient connection loop with exponential
backoff.
By default, data is routed to the telemetry_topic or event_topic based on
its stream_type. If a topic_router callable is provided, topic selection
is delegated to that function instead, allowing for advanced multiplexing
(e.g., splitting ML vs. UI events).
Attributes:
| Name | Type | Description |
|---|---|---|
bootstrap_servers |
str
|
Comma-separated list of Kafka brokers. |
telemetry_topic |
str
|
The default topic name for telemetry data. |
event_topic |
str
|
The default topic name for lifecycle events. |
topic_router |
Callable
|
Optional external logic to determine the topic. |
topic_serializers |
Optional[Dict[str, MessageSerializer]]
|
Optional mapping of topics to specific serializers. |
default_serializer |
Optional[MessageSerializer]
|
Fallback serializer if a topic is not in |
producer_config |
dict
|
Configuration dictionary passed to AIOKafkaProducer. |
Examples:
Defining a custom topic router to split machine learning events from standard telemetry:
def custom_topic_router(data: dict) -> str:
stream_type = data.get("stream_type")
if stream_type == "telemetry":
return "sim-telemetry"
value = data.get("value", {})
if isinstance(value, dict):
event_type = value.get("event_type")
if event_type == "prediction_request":
return "mill-predictions"
elif event_type == "ground_truth":
return "mill-groundtruth"
return "mill-lifecycle"
# Pass it to the egress connector
egress = KafkaEgress(
bootstrap_servers="localhost:9092",
topic_router=custom_topic_router
)
Source code in src/dynamic_des/connectors/egress/kafka.py
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 | |
Functions
__init__(bootstrap_servers, telemetry_topic='sim-telemetry', event_topic='sim-events', topic_router=None, topic_serializers=None, default_serializer=None, **kwargs)
Initializes the KafkaEgress with topic and connection settings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bootstrap_servers
|
str
|
Comma-separated list of Kafka brokers. |
required |
telemetry_topic
|
str
|
Destination topic for telemetry stream. |
'sim-telemetry'
|
event_topic
|
str
|
Destination topic for event stream. |
'sim-events'
|
topic_router
|
Optional[Callable[[dict], str]]
|
Optional callable to dynamically route payloads to specific topics. |
None
|
topic_serializers
|
Optional[Dict[str, MessageSerializer]]
|
Mapping of target topics to their specific |
None
|
default_serializer
|
Optional[MessageSerializer]
|
The fallback serializer to use if a topic lacks a specific mapping. Defaults to |
None
|
**kwargs
|
Any
|
Additional overrides for the AIOKafkaProducer configuration. |
{}
|
Source code in src/dynamic_des/connectors/egress/kafka.py
run(egress_queue)
async
The main execution loop that consumes the egress queue and publishes to Kafka.
This method maintains a persistent connection to Kafka. If the connection
is lost, it implements an exponential backoff retry strategy. It polls
the internal egress_queue for batches of data, determines the target
topic based on the 'stream_type', and performs asynchronous sends using
the configured serializers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing lists of dictionaries or Pydantic models generated by the EgressMixIn. |
required |
Note
The loop exits gracefully upon receiving an asyncio.CancelledError,
ensuring the Kafka producer is stopped correctly.
Source code in src/dynamic_des/connectors/egress/kafka.py
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 | |
ConsoleEgress
Bases: BaseEgress
Local egress provider that outputs simulation data to the standard console.
This connector serves as a debugging and local development tool. It processes batched data from the simulation and prints formatted entries to stdout, distinguishing between telemetry metrics and discrete events via prefixes.
Source code in src/dynamic_des/connectors/egress/local.py
Functions
run(egress_queue)
async
Continuously polls the egress queue and prints formatted results.
This method implements an asynchronous polling loop. It extracts batches from the internal queue, identifies the data stream type, and formats the output with specific prefixes: - [TEL]: Telemetry data (e.g., resource utilization, queue lengths). - [EVT]: Event data (e.g., task lifecycle transitions).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing lists of dictionaries (batches) generated by the EgressMixIn. |
required |
Note
If the queue is empty, the method yields control to the asyncio event loop for a short duration to prevent CPU pinning.
Source code in src/dynamic_des/connectors/egress/local.py
PostgresEgress
Bases: BaseEgress
Asynchronous egress provider for persisting simulation data to PostgreSQL.
This connector is intended to handle long-term storage of simulation results, allowing for historical analysis and complex SQL queries on telemetry and event data.
Source code in src/dynamic_des/connectors/egress/postgres.py
Functions
__init__(connection_dsn, table_name='simulation_data', **kwargs)
Initializes the PostgresEgress with connection details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection_dsn
|
str
|
PostgreSQL connection string (DSN). |
required |
table_name
|
str
|
Target table for simulation records. |
'simulation_data'
|
**kwargs
|
Any
|
Additional connection pool arguments. |
{}
|
Source code in src/dynamic_des/connectors/egress/postgres.py
run(egress_queue)
async
Main execution loop for PostgreSQL data persistence.
Should implement a resilient connection using an async driver (like asyncpg), performing bulk inserts of batched data from the egress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing batches of simulation data. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
This connector is currently a placeholder. |
Source code in src/dynamic_des/connectors/egress/postgres.py
RedisEgress
Bases: BaseEgress
Asynchronous egress provider for streaming simulation data to Redis.
Designed for real-time dashboarding or inter-process communication, publishing simulation updates to Redis Pub/Sub channels or Streams.
Source code in src/dynamic_des/connectors/egress/redis.py
Functions
__init__(host='localhost', port=6379, channel='sim_stream', **kwargs)
Initializes the RedisEgress with connection and routing details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Redis server hostname. |
'localhost'
|
port
|
int
|
Redis server port. |
6379
|
channel
|
str
|
The target Pub/Sub channel or Stream key. |
'sim_stream'
|
**kwargs
|
Any
|
Additional redis-py configuration. |
{}
|
Source code in src/dynamic_des/connectors/egress/redis.py
run(egress_queue)
async
Main execution loop for Redis data streaming.
Should implement an async client (like redis-py's async mode) to publish or add entries to Redis based on the simulation egress queue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
egress_queue
|
Queue
|
A thread-safe queue containing batches of simulation data. |
required |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
This connector is currently a placeholder. |
Source code in src/dynamic_des/connectors/egress/redis.py
Serializers
JsonSerializer
Default fallback serializer providing backward compatibility via orjson.
This serializer converts dictionaries or Pydantic models into standard JSON byte strings.
Source code in src/dynamic_des/connectors/egress/kafka.py
Functions
serialize(topic, data)
Serializes the given data to a JSON byte string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The target Kafka topic (unused by this serializer but required by protocol). |
required |
data
|
Any
|
The payload to serialize (dictionary or Pydantic model). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
The JSON-encoded byte string. |
Source code in src/dynamic_des/connectors/egress/kafka.py
ConfluentAvroSerializer
Lazy-loaded serializer for Confluent Schema Registry using confluent-kafka.
This class converts simulation data into Avro-encoded byte strings, automatically fetching and validating against the schema from a Confluent-compatible Schema Registry.
Source code in src/dynamic_des/connectors/egress/kafka.py
Functions
__init__(registry_url, schema_str, **registry_kwargs)
Initializes the Confluent Avro serializer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry_url
|
str
|
The base URL of the Confluent Schema Registry. |
required |
schema_str
|
str
|
The Avro schema defined as a JSON string (typically
generated directly from a Pydantic model via |
required |
**registry_kwargs
|
Any
|
Additional configuration arguments passed directly
to the |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If the 'confluent-kafka' package is not installed. |
Source code in src/dynamic_des/connectors/egress/kafka.py
serialize(topic, data)
Serializes the given data into an Avro byte string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The target Kafka topic, used for schema subject naming. |
required |
data
|
Any
|
The payload to serialize (dictionary or Pydantic model). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
The Avro-encoded byte string. |
Source code in src/dynamic_des/connectors/egress/kafka.py
GlueAvroSerializer
Lazy-loaded serializer for AWS Glue Schema Registry using boto3.
This class converts simulation data into Avro-encoded byte strings, integrating directly with AWS Glue Schema Registry for schema validation.
Source code in src/dynamic_des/connectors/egress/kafka.py
Functions
__init__(registry_name, schema_str, **boto3_kwargs)
Initializes the AWS Glue Avro serializer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry_name
|
str
|
The name of the AWS Glue Schema Registry. |
required |
schema_str
|
str
|
The Avro schema defined as a JSON string (typically
generated directly from a Pydantic model via |
required |
**boto3_kwargs
|
Any
|
Additional arguments passed directly to the |
{}
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If the 'aws-glue-schema-registry' or 'boto3' packages are not installed. |
Source code in src/dynamic_des/connectors/egress/kafka.py
serialize(topic, data)
Serializes the given data into an Avro byte string using AWS Glue.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The target Kafka topic. |
required |
data
|
Any
|
The payload to serialize (dictionary or Pydantic model). |
required |
Returns:
| Name | Type | Description |
|---|---|---|
bytes |
bytes
|
The Avro-encoded byte string. |