Building on our exploration of stream processing, we now transition from Kafka’s native library to Apache Flink, a powerful, general-purpose distributed processing engine. In this post, we’ll dive into Flink’s foundational DataStream API. We will tackle the same supplier statistics problem - analyzing a stream of Avro-formatted order events - but this time using Flink’s robust features for stateful computation. This example will highlight Flink’s sophisticated event-time processing with watermarks and its elegant, built-in mechanisms for handling late-arriving data through side outputs.

We develop a Flink DataStream application designed for scalable, real-time event processing. The application:

  • Consumes Avro-formatted order events from a Kafka topic.
  • Assigns event-time timestamps and watermarks to handle out-of-order data.
  • Aggregates order data into 5-second tumbling windows to calculate total price and order counts for each supplier.
  • Leverages Flink’s side-output mechanism to gracefully handle and route late-arriving records to a separate topic.
  • Serializes the resulting supplier statistics and late records back to Kafka, using Avro and JSON respectively.

The source code for the application discussed in this post can befound in the orders-stats-flink folder of this GitHub repository.

The Build Configuration

The build.gradle.kts file sets up the project, its dependencies, and packaging. It’s shared between the DataStream and Table API applications - The Flink application that uses the Table API will be covered in the next post.

  • Plugins:
    • kotlin("jvm"): Enables Kotlin language support.
    • com.github.davidmc24.gradle.plugin.avro: Compiles Avro schemas into Java classes.
    • com.github.johnrengelman.shadow: Creates an executable “fat JAR” with all dependencies.
    • application: Configures the project to be runnable via Gradle.
  • Dependencies:
    • Flink Core & APIs: flink-streaming-java, flink-clients.
    • Flink Connectors: flink-connector-kafka for Kafka integration.
    • Flink Formats: flink-avro and flink-avro-confluent-registry for handling Avro data with Confluent Schema Registry.
    • Note on Dependency Scope: The Flink dependencies are declared with implementation. This allows the application to be run directly with ./gradlew run. For production deployments on a Flink cluster (where the Flink runtime is already provided), these dependencies should be changed to compileOnly to significantly reduce the size of the final JAR.
  • Application Configuration:
    • The application block sets the mainClass and passes necessary JVM arguments for Flink’s runtime. The run task is configured with environment variables to specify Kafka and Schema Registry connection details.
  • Avro & Shadow JAR:
    • The avro block configures code generation.
    • The shadowJar task configures the output JAR name and merges service files, which is crucial for Flink connectors to work correctly.
 1plugins {
 2    kotlin("jvm") version "2.1.20"
 3    id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
 4    id("com.github.johnrengelman.shadow") version "8.1.1"
 5    application
 6}
 7
 8group = "me.jaehyeon"
 9version = "1.0-SNAPSHOT"
10
11repositories {
12    mavenCentral()
13    maven("https://packages.confluent.io/maven")
14}
15
16dependencies {
17    // Flink Core and APIs
18    implementation("org.apache.flink:flink-streaming-java:1.20.1")
19    implementation("org.apache.flink:flink-table-api-java:1.20.1")
20    implementation("org.apache.flink:flink-table-api-java-bridge:1.20.1")
21    implementation("org.apache.flink:flink-table-planner-loader:1.20.1")
22    implementation("org.apache.flink:flink-table-runtime:1.20.1")
23    implementation("org.apache.flink:flink-clients:1.20.1")
24    implementation("org.apache.flink:flink-connector-base:1.20.1")
25    // Flink Kafka and Avro
26    implementation("org.apache.flink:flink-connector-kafka:3.4.0-1.20")
27    implementation("org.apache.flink:flink-avro:1.20.1")
28    implementation("org.apache.flink:flink-avro-confluent-registry:1.20.1")
29    // Json
30    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.13.0")
31    // Logging
32    implementation("io.github.microutils:kotlin-logging-jvm:3.0.5")
33    implementation("ch.qos.logback:logback-classic:1.5.13")
34    // Kotlin test
35    testImplementation(kotlin("test"))
36}
37
38kotlin {
39    jvmToolchain(17)
40}
41
42application {
43    mainClass.set("me.jaehyeon.MainKt")
44    applicationDefaultJvmArgs =
45        listOf(
46            "--add-opens=java.base/java.util=ALL-UNNAMED",
47        )
48}
49
50avro {
51    setCreateSetters(true)
52    setFieldVisibility("PUBLIC")
53}
54
55tasks.named("compileKotlin") {
56    dependsOn("generateAvroJava")
57}
58
59sourceSets {
60    named("main") {
61        java.srcDirs("build/generated/avro/main")
62        kotlin.srcDirs("src/main/kotlin")
63    }
64}
65
66tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
67    archiveBaseName.set("orders-stats-flink")
68    archiveClassifier.set("")
69    archiveVersion.set("1.0")
70    mergeServiceFiles()
71}
72
73tasks.named("build") {
74    dependsOn("shadowJar")
75}
76
77tasks.named<JavaExec>("run") {
78    environment("TO_SKIP_PRINT", "false")
79    environment("BOOTSTRAP", "localhost:9092")
80    environment("REGISTRY_URL", "http://localhost:8081")
81}
82
83tasks.test {
84    useJUnitPlatform()
85}

Avro Schema for Supplier Statistics

The SupplierStats.avsc file defines the structure for the aggregated output data. This schema is used by the Flink Kafka sink to serialize the SupplierStats objects into Avro format, ensuring type safety and enabling schema evolution for downstream consumers.

  • Type: A record named SupplierStats in the me.jaehyeon.avro namespace.
  • Fields:
    • window_start and window_end (string): The start and end times of the aggregation window.
    • supplier (string): The supplier being aggregated.
    • total_price (double): The sum of order prices within the window.
    • count (long): The total number of orders within the window.
 1{
 2  "type": "record",
 3  "name": "SupplierStats",
 4  "namespace": "me.jaehyeon.avro",
 5  "fields": [
 6    { "name": "window_start", "type": "string" },
 7    { "name": "window_end", "type": "string" },
 8    { "name": "supplier", "type": "string" },
 9    { "name": "total_price", "type": "double" },
10    { "name": "count", "type": "long" }
11  ]
12}

Shared Utilities

These utility files provide common functionality used by both the DataStream and Table API applications.

Kafka Admin Utilities

This file provides two key helper functions for interacting with the Kafka ecosystem:

  • createTopicIfNotExists(...): Uses Kafka’s AdminClient to programmatically create topics. It’s designed to be idempotent, safely handling cases where the topic already exists to prevent application startup failures.
  • getLatestSchema(...): Connects to the Confluent Schema Registry using CachedSchemaRegistryClient to fetch the latest Avro schema for a given subject. This is essential for the Flink source to correctly deserialize incoming Avro records without hardcoding the schema in the application.
 1package me.jaehyeon.kafka
 2
 3import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
 4import mu.KotlinLogging
 5import org.apache.avro.Schema
 6import org.apache.kafka.clients.admin.AdminClient
 7import org.apache.kafka.clients.admin.AdminClientConfig
 8import org.apache.kafka.clients.admin.NewTopic
 9import org.apache.kafka.common.errors.TopicExistsException
10import java.util.Properties
11import java.util.concurrent.ExecutionException
12import kotlin.use
13
14private val logger = KotlinLogging.logger { }
15
16fun createTopicIfNotExists(
17    topicName: String,
18    bootstrapAddress: String,
19    numPartitions: Int,
20    replicationFactor: Short,
21) {
22    val props =
23        Properties().apply {
24            put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
25            put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000")
26            put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000")
27            put(AdminClientConfig.RETRIES_CONFIG, "1")
28        }
29
30    AdminClient.create(props).use { client ->
31        val newTopic = NewTopic(topicName, numPartitions, replicationFactor)
32        val result = client.createTopics(listOf(newTopic))
33
34        try {
35            logger.info { "Attempting to create topic '$topicName'..." }
36            result.all().get()
37            logger.info { "Topic '$topicName' created successfully!" }
38        } catch (e: ExecutionException) {
39            if (e.cause is TopicExistsException) {
40                logger.warn { "Topic '$topicName' was created concurrently or already existed. Continuing..." }
41            } else {
42                throw RuntimeException("Unrecoverable error while creating a topic '$topicName'.", e)
43            }
44        }
45    }
46}
47
48fun getLatestSchema(
49    schemaSubject: String,
50    registryUrl: String,
51    registryConfig: Map<String, String>,
52): Schema {
53    val schemaRegistryClient =
54        CachedSchemaRegistryClient(
55            registryUrl,
56            100,
57            registryConfig,
58        )
59    logger.info { "Fetching latest schema for subject '$schemaSubject' from $registryUrl" }
60    try {
61        val latestSchemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(schemaSubject)
62        logger.info {
63            "Successfully fetched schema ID ${latestSchemaMetadata.id} version ${latestSchemaMetadata.version} for subject '$schemaSubject'"
64        }
65        return Schema.Parser().parse(latestSchemaMetadata.schema)
66    } catch (e: Exception) {
67        logger.error(e) { "Failed to retrieve schema for subject '$schemaSubject' from registry $registryUrl" }
68        throw RuntimeException("Failed to retrieve schema for subject '$schemaSubject'", e)
69    }
70}

This file centralizes the creation of Flink’s Kafka sources and sinks.

  • createOrdersSource(...): Configures a KafkaSource to consume GenericRecord Avro data. It uses ConfluentRegistryAvroDeserializationSchema to automatically deserialize messages using the schema from Confluent Schema Registry.
  • createStatsSink(...): Configures a KafkaSink for the aggregated SupplierStats. It uses ConfluentRegistryAvroSerializationSchema to serialize the specific SupplierStats type and sets the Kafka message key to the supplier’s name.
  • createSkippedSink(...): Creates a generic KafkaSink for late records, which are handled as simple key-value string pairs.
  1package me.jaehyeon.kafka
  2
  3import me.jaehyeon.avro.SupplierStats
  4import org.apache.avro.Schema
  5import org.apache.avro.generic.GenericRecord
  6import org.apache.flink.connector.base.DeliveryGuarantee
  7import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
  8import org.apache.flink.connector.kafka.sink.KafkaSink
  9import org.apache.flink.connector.kafka.source.KafkaSource
 10import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
 11import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
 12import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema
 13import org.apache.kafka.clients.consumer.ConsumerConfig
 14import org.apache.kafka.clients.producer.ProducerConfig
 15import java.nio.charset.StandardCharsets
 16import java.util.Properties
 17
 18fun createOrdersSource(
 19    topic: String,
 20    groupId: String,
 21    bootstrapAddress: String,
 22    registryUrl: String,
 23    registryConfig: Map<String, String>,
 24    schema: Schema,
 25): KafkaSource<GenericRecord> =
 26    KafkaSource
 27        .builder<GenericRecord>()
 28        .setBootstrapServers(bootstrapAddress)
 29        .setTopics(topic)
 30        .setGroupId(groupId)
 31        .setStartingOffsets(OffsetsInitializer.earliest())
 32        .setValueOnlyDeserializer(
 33            ConfluentRegistryAvroDeserializationSchema.forGeneric(
 34                schema,
 35                registryUrl,
 36                registryConfig,
 37            ),
 38        ).setProperties(
 39            Properties().apply {
 40                put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500")
 41            },
 42        ).build()
 43
 44fun createStatsSink(
 45    topic: String,
 46    bootstrapAddress: String,
 47    registryUrl: String,
 48    registryConfig: Map<String, String>,
 49    outputSubject: String,
 50): KafkaSink<SupplierStats> =
 51    KafkaSink
 52        .builder<SupplierStats>()
 53        .setBootstrapServers(bootstrapAddress)
 54        .setKafkaProducerConfig(
 55            Properties().apply {
 56                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
 57                setProperty(ProducerConfig.LINGER_MS_CONFIG, "100")
 58                setProperty(ProducerConfig.BATCH_SIZE_CONFIG, (64 * 1024).toString())
 59                setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
 60            },
 61        ).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
 62        .setRecordSerializer(
 63            KafkaRecordSerializationSchema
 64                .builder<SupplierStats>()
 65                .setTopic(topic)
 66                .setKeySerializationSchema { value: SupplierStats ->
 67                    value.supplier.toByteArray(StandardCharsets.UTF_8)
 68                }.setValueSerializationSchema(
 69                    ConfluentRegistryAvroSerializationSchema.forSpecific<SupplierStats>(
 70                        SupplierStats::class.java,
 71                        outputSubject,
 72                        registryUrl,
 73                        registryConfig,
 74                    ),
 75                ).build(),
 76        ).build()
 77
 78fun createSkippedSink(
 79    topic: String,
 80    bootstrapAddress: String,
 81): KafkaSink<Pair<String?, String>> =
 82    KafkaSink
 83        .builder<Pair<String?, String>>()
 84        .setBootstrapServers(bootstrapAddress)
 85        .setKafkaProducerConfig(
 86            Properties().apply {
 87                setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
 88                setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
 89                setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
 90                setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
 91            },
 92        ).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
 93        .setRecordSerializer(
 94            KafkaRecordSerializationSchema
 95                .builder<Pair<String?, String>>()
 96                .setTopic(topic)
 97                .setKeySerializationSchema { pair: Pair<String?, String> ->
 98                    pair.first?.toByteArray(StandardCharsets.UTF_8)
 99                }.setValueSerializationSchema { pair: Pair<String?, String> ->
100                    pair.second.toByteArray(StandardCharsets.UTF_8)
101                }.build(),
102        ).build()

DataStream Processing Logic

The following files contain the core logic specific to the DataStream API implementation.

Timestamp and Watermark Strategy

For event-time processing, Flink needs to know each event’s timestamp and how to handle out-of-order data. This WatermarkStrategy extracts the timestamp from the bid_time field of the incoming RecordMap. It uses forBoundedOutOfOrderness with a 5-second duration, telling Flink to expect records to be at most 5 seconds late.

 1package me.jaehyeon.flink.watermark
 2
 3import me.jaehyeon.flink.processing.RecordMap
 4import mu.KotlinLogging
 5import org.apache.flink.api.common.eventtime.WatermarkStrategy
 6import java.time.Duration
 7import java.time.LocalDateTime
 8import java.time.ZoneId
 9import java.time.format.DateTimeFormatter
10
11private val logger = KotlinLogging.logger {}
12
13object SupplierWatermarkStrategy {
14    val strategy: WatermarkStrategy<RecordMap> =
15        WatermarkStrategy
16            .forBoundedOutOfOrderness<RecordMap>(Duration.ofSeconds(5)) // Operates on RecordMap
17            .withTimestampAssigner { recordMap, _ ->
18                val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
19                try {
20                    val bidTimeString = recordMap["bid_time"]?.toString()
21                    if (bidTimeString != null) {
22                        val ldt = LocalDateTime.parse(bidTimeString, formatter)
23                        ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
24                    } else {
25                        logger.warn { "Missing 'bid_time' field in RecordMap: $recordMap. Using processing time." }
26                        System.currentTimeMillis()
27                    }
28                } catch (e: Exception) {
29                    logger.error(e) { "Error parsing 'bid_time' from RecordMap: $recordMap. Using processing time." }
30                    System.currentTimeMillis()
31                }
32            }.withIdleness(Duration.ofSeconds(10)) // Optional: if partitions can be idle
33}

Custom Aggregation and Windowing Functions

Flink’s DataStream API provides fine-grained control over windowed aggregations using a combination of an AggregateFunction and a WindowFunction.

  • SupplierStatsAggregator: This AggregateFunction performs efficient, incremental aggregation. For each record in a window, it updates an accumulator, adding the price to totalPrice and incrementing the count. This pre-aggregation is highly optimized as it doesn’t need to store all records in the window.
  • SupplierStatsFunction: This WindowFunction is applied once the window is complete. It receives the final accumulator from the AggregateFunction and has access to the window’s metadata (key, start time, end time). It uses this information to construct the final SupplierStats Avro object.
 1package me.jaehyeon.flink.processing
 2
 3import org.apache.flink.api.common.functions.AggregateFunction
 4
 5typealias RecordMap = Map<String, Any?>
 6
 7data class SupplierStatsAccumulator(
 8    var totalPrice: Double = 0.0,
 9    var count: Long = 0L,
10)
11
12class SupplierStatsAggregator : AggregateFunction<RecordMap, SupplierStatsAccumulator, SupplierStatsAccumulator> {
13    override fun createAccumulator(): SupplierStatsAccumulator = SupplierStatsAccumulator()
14
15    override fun add(
16        value: RecordMap,
17        accumulator: SupplierStatsAccumulator,
18    ): SupplierStatsAccumulator =
19        SupplierStatsAccumulator(
20            accumulator.totalPrice + value["price"] as Double,
21            accumulator.count + 1,
22        )
23
24    override fun getResult(accumulator: SupplierStatsAccumulator): SupplierStatsAccumulator = accumulator
25
26    override fun merge(
27        a: SupplierStatsAccumulator,
28        b: SupplierStatsAccumulator,
29    ): SupplierStatsAccumulator =
30        SupplierStatsAccumulator(
31            totalPrice = a.totalPrice + b.totalPrice,
32            count = a.count + b.count,
33        )
34}
 1package me.jaehyeon.flink.processing
 2
 3import me.jaehyeon.avro.SupplierStats
 4import org.apache.flink.streaming.api.functions.windowing.WindowFunction
 5import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 6import org.apache.flink.util.Collector
 7import java.time.Instant
 8import java.time.ZoneId
 9import java.time.format.DateTimeFormatter
10
11class SupplierStatsFunction : WindowFunction<SupplierStatsAccumulator, SupplierStats, String, TimeWindow> {
12    companion object {
13        private val formatter: DateTimeFormatter =
14            DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault())
15    }
16
17    override fun apply(
18        supplierKey: String,
19        window: TimeWindow,
20        input: Iterable<SupplierStatsAccumulator>,
21        out: Collector<SupplierStats>,
22    ) {
23        val accumulator = input.firstOrNull() ?: return
24        val windowStartStr = formatter.format(Instant.ofEpochMilli(window.start))
25        val windowEndStr = formatter.format(Instant.ofEpochMilli(window.end))
26
27        out.collect(
28            SupplierStats
29                .newBuilder()
30                .setWindowStart(windowStartStr)
31                .setWindowEnd(windowEndStr)
32                .setSupplier(supplierKey)
33                .setTotalPrice(String.format("%.2f", accumulator.totalPrice).toDouble())
34                .setCount(accumulator.count)
35                .build(),
36        )
37    }
38}

Not Applicable Source Code

RowWatermarkStrategy and LateDataRouter are used exclusively by the Flink Table API application and are not relevant to this DataStream implementation. The DataStream API handles late data using the built-in .sideOutputLateData() method, making a custom router unnecessary.

Core DataStream Application

This is the main driver for the DataStream application. It defines and executes the Flink job topology.

  1. Environment Setup: It initializes the StreamExecutionEnvironment and creates the necessary output Kafka topics.
  2. Source and Transformation: It creates a Kafka source for Avro GenericRecords and then maps them to a more convenient DataStream<RecordMap>.
  3. Timestamping and Windowing:
    • assignTimestampsAndWatermarks applies the custom SupplierWatermarkStrategy.
    • The stream is keyed by the supplier field.
    • A TumblingEventTimeWindows of 5 seconds is defined.
    • allowedLateness is set to 5 seconds, allowing the window state to be kept for an additional 5 seconds after the watermark passes to accommodate late-but-not-too-late events.
  4. Late Data Handling: sideOutputLateData is a key feature. It directs any records arriving after the allowedLateness period to a separate stream identified by an OutputTag.
  5. Aggregation: The .aggregate() call combines the efficient SupplierStatsAggregator with the final SupplierStatsFunction to produce the statistics.
  6. Sinking:
    • The main statsStream is sent to the statsSink.
    • The late data stream, retrieved via getSideOutput, is processed (converted to JSON with a “late” flag) and sent to the skippedSink.
  7. Execution: env.execute() starts the Flink job.
  1package me.jaehyeon
  2
  3import com.fasterxml.jackson.databind.ObjectMapper
  4import com.fasterxml.jackson.module.kotlin.registerKotlinModule
  5import me.jaehyeon.avro.SupplierStats
  6import me.jaehyeon.flink.processing.RecordMap
  7import me.jaehyeon.flink.processing.SupplierStatsAggregator
  8import me.jaehyeon.flink.processing.SupplierStatsFunction
  9import me.jaehyeon.flink.watermark.SupplierWatermarkStrategy
 10import me.jaehyeon.kafka.createOrdersSource
 11import me.jaehyeon.kafka.createSkippedSink
 12import me.jaehyeon.kafka.createStatsSink
 13import me.jaehyeon.kafka.createTopicIfNotExists
 14import me.jaehyeon.kafka.getLatestSchema
 15import mu.KotlinLogging
 16import org.apache.avro.generic.GenericRecord
 17import org.apache.flink.api.common.eventtime.WatermarkStrategy
 18import org.apache.flink.api.common.typeinfo.TypeHint
 19import org.apache.flink.api.common.typeinfo.TypeInformation
 20import org.apache.flink.streaming.api.datastream.DataStream
 21import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
 22import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 23import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 24import org.apache.flink.util.OutputTag
 25import java.time.Duration
 26
 27object DataStreamApp {
 28    private val toSkipPrint = System.getenv("TO_SKIP_PRINT")?.toBoolean() ?: true
 29    private val bootstrapAddress = System.getenv("BOOTSTRAP") ?: "kafka-1:19092"
 30    private val inputTopicName = System.getenv("TOPIC") ?: "orders-avro"
 31    private val registryUrl = System.getenv("REGISTRY_URL") ?: "http://schema:8081"
 32    private val registryConfig =
 33        mapOf(
 34            "basic.auth.credentials.source" to "USER_INFO",
 35            "basic.auth.user.info" to "admin:admin",
 36        )
 37    private const val INPUT_SCHEMA_SUBJECT = "orders-avro-value"
 38    private const val NUM_PARTITIONS = 3
 39    private const val REPLICATION_FACTOR: Short = 3
 40    private val logger = KotlinLogging.logger {}
 41
 42    // ObjectMapper for converting late data Map to JSON
 43    private val objectMapper: ObjectMapper by lazy {
 44        ObjectMapper().registerKotlinModule()
 45    }
 46
 47    fun run() {
 48        // Create output topics if not existing
 49        val outputTopicName = "$inputTopicName-kds-stats"
 50        val skippedTopicName = "$inputTopicName-kds-skipped"
 51        listOf(outputTopicName, skippedTopicName).forEach { name ->
 52            createTopicIfNotExists(
 53                name,
 54                bootstrapAddress,
 55                NUM_PARTITIONS,
 56                REPLICATION_FACTOR,
 57            )
 58        }
 59
 60        val env = StreamExecutionEnvironment.getExecutionEnvironment()
 61        env.parallelism = 3
 62
 63        val inputAvroSchema = getLatestSchema(INPUT_SCHEMA_SUBJECT, registryUrl, registryConfig)
 64        val ordersGenericRecordSource =
 65            createOrdersSource(
 66                topic = inputTopicName,
 67                groupId = "$inputTopicName-flink-ds",
 68                bootstrapAddress = bootstrapAddress,
 69                registryUrl = registryUrl,
 70                registryConfig = registryConfig,
 71                schema = inputAvroSchema,
 72            )
 73
 74        // 1. Stream of GenericRecords from Kafka
 75        val genericRecordStream: DataStream<GenericRecord> =
 76            env
 77                .fromSource(ordersGenericRecordSource, WatermarkStrategy.noWatermarks(), "KafkaGenericRecordSource")
 78
 79        // 2. Convert GenericRecord to Map<String, Any?> (RecordMap)
 80        val recordMapStream: DataStream<RecordMap> =
 81            genericRecordStream
 82                .map { genericRecord ->
 83                    val map = mutableMapOf<String, Any?>()
 84                    genericRecord.schema.fields.forEach { field ->
 85                        val value = genericRecord.get(field.name())
 86                        map[field.name()] = if (value is org.apache.avro.util.Utf8) value.toString() else value
 87                    }
 88                    map as RecordMap // Cast to type alias
 89                }.name("GenericRecordToMapConverter")
 90                .returns(TypeInformation.of(object : TypeHint<RecordMap>() {}))
 91
 92        // 3. Define OutputTag for late data (now carrying RecordMap)
 93        val lateMapOutputTag =
 94            OutputTag(
 95                "late-order-records",
 96                TypeInformation.of(object : TypeHint<RecordMap>() {}),
 97            )
 98
 99        // 4. Process the RecordMap stream
100        val statsStreamOperator: SingleOutputStreamOperator<SupplierStats> =
101            recordMapStream
102                .assignTimestampsAndWatermarks(SupplierWatermarkStrategy.strategy)
103                .keyBy { recordMap -> recordMap["supplier"].toString() }
104                .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
105                .allowedLateness(Duration.ofSeconds(5))
106                .sideOutputLateData(lateMapOutputTag)
107                .aggregate(SupplierStatsAggregator(), SupplierStatsFunction())
108        val statsStream: DataStream<SupplierStats> = statsStreamOperator
109
110        // 5. Handle late data as a pair of key and value
111        val lateDataMapStream: DataStream<RecordMap> = statsStreamOperator.getSideOutput(lateMapOutputTag)
112        val lateKeyPairStream: DataStream<Pair<String?, String>> =
113            lateDataMapStream
114                .map { recordMap ->
115                    val mutableMap = recordMap.toMutableMap()
116                    mutableMap["late"] = true
117                    val orderId = mutableMap["order_id"] as? String
118                    try {
119                        val value = objectMapper.writeValueAsString(mutableMap)
120                        Pair(orderId, value)
121                    } catch (e: Exception) {
122                        logger.error(e) { "Error serializing late RecordMap to JSON: $mutableMap" }
123                        val errorJson = "{ \"error\": \"json_serialization_failed\", \"data_keys\": \"${
124                            mutableMap.keys.joinToString(
125                                ",",
126                            )}\" }"
127                        Pair(orderId, errorJson)
128                    }
129                }.returns(TypeInformation.of(object : TypeHint<Pair<String?, String>>() {}))
130
131        if (!toSkipPrint) {
132            statsStream
133                .print()
134                .name("SupplierStatsPrint")
135            lateKeyPairStream
136                .map { it.second }
137                .print()
138                .name("LateDataPrint")
139        }
140
141        val statsSink =
142            createStatsSink(
143                topic = outputTopicName,
144                bootstrapAddress = bootstrapAddress,
145                registryUrl = registryUrl,
146                registryConfig = registryConfig,
147                outputSubject = "$outputTopicName-value",
148            )
149
150        val skippedSink =
151            createSkippedSink(
152                topic = skippedTopicName,
153                bootstrapAddress = bootstrapAddress,
154            )
155
156        statsStream.sinkTo(statsSink).name("SupplierStatsSink")
157        lateKeyPairStream.sinkTo(skippedSink).name("LateDataSink")
158        env.execute("SupplierStats")
159    }
160}

Application Entry Point

The Main.kt file serves as the entry point for the application. It parses a command-line argument (datastream or table) to determine which Flink application to run. A try-catch block ensures that any fatal error during execution is logged before the application exits.

 1package me.jaehyeon
 2
 3import mu.KotlinLogging
 4import kotlin.system.exitProcess
 5
 6private val logger = KotlinLogging.logger {}
 7
 8fun main(args: Array<String>) {
 9    try {
10        when (args.getOrNull(0)?.lowercase()) {
11            "datastream" -> DataStreamApp.run()
12            "table" -> TableApp.run()
13            else -> println("Usage: <datastream | table>")
14        }
15    } catch (e: Exception) {
16        logger.error(e) { "Fatal error in ${args.getOrNull(0) ?: "app"}. Shutting down." }
17        exitProcess(1)
18    }
19}

To observe our Flink DataStream application in action, we’ll follow the essential steps: setting up a local Kafka environment, generating a stream of test data, and then executing the Flink job.

Factor House Local Setup

A local Kafka environment is a prerequisite. If you don’t have one running, use the Factor House Local project to quickly get started:

  1. Clone the repository:
    1git clone https://github.com/factorhouse/factorhouse-local.git
    2cd factorhouse-local
    
  2. Configure your Kpow community license as detailed in the project’s README.
  3. Start the Docker services:
    1docker compose -f compose-kpow-community.yml up -d
    

Once running, the Kpow UI at http://localhost:3000 will provide visibility into your Kafka cluster.

Start the Kafka Order Producer

Our Flink application is designed to consume order data from the orders-avro topic. We’ll use the Kafka producer developed in Part 2 of this series to generate this data. To properly test Flink’s event-time windowing, we’ll configure the producer to add a randomized delay (up to 30 seconds) to the bid_time field.

Navigate to the directory of the producer application (orders-avro-clients from the GitHub repository) and run:

1# Assuming you are in the root of the 'orders-avro-clients' project
2DELAY_SECONDS=30 ./gradlew run --args="producer"

This will start populating the orders-avro topic with Avro-encoded order messages. You can inspect these messages in Kpow. Ensure Kpow is configured with Key Deserializer: String, Value Deserializer: AVRO, and Schema Registry: Local Schema Registry.

With a steady stream of order events being produced, we can now launch our orders-stats-flink application. Navigate to its project directory. The application’s entry point is designed to run different jobs based on a command-line argument; for this post, we’ll use datastream.

The application can be run in two main ways:

  1. With Gradle (Development Mode):
    1./gradlew run --args="datastream"
    
  2. Running the Shadow JAR (Deployment Mode):
    1# First, build the fat JAR
    2./gradlew shadowJar
    3
    4# Then run it
    5java --add-opens=java.base/java.util=ALL-UNNAMED \
    6  -jar build/libs/orders-stats-flink-1.0.jar datastream
    

💡 To build and run the application locally, ensure that JDK 17 and a recent version of Gradle (e.g., 7.6+ or 8.x) are installed.

For this demonstration, we’ll use Gradle to run the application in development mode. Upon starting, you’ll see logs indicating the Flink application has initialized and is processing records from the orders-avro topic.

Observing the Output

Our Flink DataStream job writes its results to two distinct Kafka topics:

  • orders-avro-kds-stats: Contains the aggregated supplier statistics as Avro records.
  • orders-avro-kds-skipped: Contains records identified as “late,” serialized as JSON.

1. Supplier Statistics (orders-avro-kds-stats):

In Kpow, navigate to the orders-avro-kds-stats topic. Configure Kpow to view these messages:

  • Key Deserializer: String
  • Value Deserializer: AVRO
  • Schema Registry: Local Schema Registry

You should see SupplierStats messages, each representing the total price and count of orders for a supplier within a 5-second window. Notice the window_start and window_end fields.

2. Skipped (Late) Records (orders-avro-kds-skipped):

Next, inspect the orders-avro-kds-skipped topic in Kpow. Configure Kpow as follows:

  • Key Deserializer: String
  • Value Deserializer: JSON

These records are the ones that arrived too late to be included in their windows, even after the allowedLateness period. They were captured using Flink’s powerful .sideOutputLateData() function and then converted to JSON with a "late": true field for confirmation.

Conclusion

In this post, we’ve built a powerful, real-time analytics job using Apache Flink’s DataStream API. We demonstrated how to implement a complete stateful pipeline in Kotlin, from consuming Avro records to performing windowed aggregations with a custom AggregateFunction and WindowFunction. We saw how Flink’s WatermarkStrategy provides a robust foundation for event-time processing and how the .sideOutputLateData() operator offers a clean, first-class solution for isolating late records. This approach showcases the fine-grained control and high performance the DataStream API offers for complex stream processing challenges. Next, we will see how to solve the same problem with a much more declarative approach using Flink’s Table API.