Skip to content

Advanced Serialization: Avro & Pydantic

By default, KafkaEgress strictly dumps all payloads as highly optimized JSON. While great for lightweight lifecycle and telemetry events, high-velocity ML streams (like PredictionRequests) often require compact, schema-validated binary payloads using Avro.

Dynamic DES provides a Pluggable Serialization Strategy, allowing you to seamlessly mix JSON and Avro streams on the exact same connection, while natively supporting Pydantic validation.

Pydantic Duck-Typing

You do not need to convert your data to dictionaries manually. KafkaEgress uses duck-typing to automatically detect and extract data from Pydantic V1 and V2 models.

You can yield raw, strongly-typed models directly from your simulation logic:

from pydantic import BaseModel
from dynamic_des import DynamicRealtimeEnvironment

class TaskEvent(BaseModel):
    path_id: str
    status: str

def work_task(env: DynamicRealtimeEnvironment, task_id: int):
    # Pass the Pydantic model directly to the framework!
    event = TaskEvent(path_id="Line_A.lathe", status="queued")
    env.publish_event(f"task-{task_id}", event)

Pluggable Topic Routing

Now that you are yielding Pydantic models, you need a way to tell Dynamic DES which models should remain standard JSON, and which ones need to be serialized as Avro. We do this using Topic Routing.

You can configure KafkaEgress to route different events to different topics, and apply specific serializers (like Confluent or AWS Glue Schema Registries) only where needed. Any topic that does not have an explicit serializer mapped will safely fall back to the default JsonSerializer.

from dynamic_des import KafkaEgress
from dynamic_des.connectors.egress.kafka import ConfluentAvroSerializer

# 1. Initialize your Avro Serializers for specific ML schemas
prediction_serializer = ConfluentAvroSerializer(
    registry_url="http://localhost:8081",
    schema_str="""{"type": "record", "name": "Prediction", "fields": [...]}"""
)

# 2. Define a topic router to split standard events from ML events
def ml_topic_router(data: dict) -> str:
    stream_type = data.get("stream_type")

    if stream_type == "telemetry":
        return "sim-telemetry"

    value = data.get("value", {})
    if value.get("event_type") == "prediction":
        return "ml-predictions"

    return "sim-events"

# 3. Configure the Egress Connector
egress = KafkaEgress(
    bootstrap_servers="localhost:9092",
    topic_router=ml_topic_router,

    # Map the ML topic to Avro.
    # 'sim-telemetry' and 'sim-events' will automatically fall back to JSON!
    topic_serializers={
        "ml-predictions": prediction_serializer,
    }
)

Auto-Generating Avro Schemas

Hardcoding Avro JSON strings in Python is prone to error and schema drift. The industry best practice is to generate your Avro schema directly from your Pydantic model.

Dynamic DES officially recommends using the dataclasses-avroschema library for this.

Define your model using AvroBaseModel

from pydantic import Field
from dataclasses_avroschema.pydantic import AvroBaseModel

class MLPrediction(AvroBaseModel):
    """My high-velocity ML payload"""
    task_id: str = Field(...)
    confidence: float = Field(...)

    class Meta:
        namespace = "com.dynamic_des.ml"
        schema_name = "PredictionEvent"

# Automatically generate the Avro schema string!
schema_string = MLPrediction.avro_schema()

Plug it into the Egress Connector

Both ConfluentAvroSerializer and GlueAvroSerializer accept the schema_str argument.

Once you have initialized the serializer with your auto-generated schema, the final step is to pass it into your KafkaEgress topic configuration.

from dynamic_des.connectors.egress.kafka import ConfluentAvroSerializer
from dynamic_des import KafkaEgress

# 1. Initialize with the auto-generated schema
confluent_serializer = ConfluentAvroSerializer(
    registry_url="http://localhost:8081",
    schema_str=MLPrediction.avro_schema() # Always up to date!
)

# 2. Map it to your topic router!
egress = KafkaEgress(
    bootstrap_servers="localhost:9092",
    topic_router=ml_topic_router,
    topic_serializers={
        "ml-predictions": confluent_serializer, # Specify the serializer for a topic!
    }
)