Building on our exploration of stream processing, we now transition from Kafka’s native library to Apache Flink, a powerful, general-purpose distributed processing engine. In this post, we’ll dive into Flink’s foundational DataStream API. We will tackle the same supplier statistics problem - analyzing a stream of Avro-formatted order events - but this time using Flink’s robust features for stateful computation. This example will highlight Flink’s sophisticated event-time processing with watermarks and its elegant, built-in mechanisms for handling late-arriving data through side outputs.
- Kafka Clients with JSON - Producing and Consuming Order Events
- Kafka Clients with Avro - Schema Registry and Order Events
- Kafka Streams - Lightweight Real-Time Processing for Supplier Stats
- Flink DataStream API - Scalable Event Processing for Supplier Stats (this post)
- Flink Table API - Declarative Analytics for Supplier Stats in Real Time
Flink DataStream Application
We develop a Flink DataStream application designed for scalable, real-time event processing. The application:
- Consumes Avro-formatted order events from a Kafka topic.
- Assigns event-time timestamps and watermarks to handle out-of-order data.
- Aggregates order data into 5-second tumbling windows to calculate total price and order counts for each supplier.
- Leverages Flink’s side-output mechanism to gracefully handle and route late-arriving records to a separate topic.
- Serializes the resulting supplier statistics and late records back to Kafka, using Avro and JSON respectively.
The source code for the application discussed in this post can befound in the orders-stats-flink folder of this GitHub repository.
The Build Configuration
The build.gradle.kts
file sets up the project, its dependencies, and packaging. It’s shared between the DataStream and Table API applications - The Flink application that uses the Table API will be covered in the next post.
- Plugins:
kotlin("jvm")
: Enables Kotlin language support.com.github.davidmc24.gradle.plugin.avro
: Compiles Avro schemas into Java classes.com.github.johnrengelman.shadow
: Creates an executable “fat JAR” with all dependencies.application
: Configures the project to be runnable via Gradle.
- Dependencies:
- Flink Core & APIs:
flink-streaming-java
,flink-clients
. - Flink Connectors:
flink-connector-kafka
for Kafka integration. - Flink Formats:
flink-avro
andflink-avro-confluent-registry
for handling Avro data with Confluent Schema Registry. - Note on Dependency Scope: The Flink dependencies are declared with
implementation
. This allows the application to be run directly with./gradlew run
. For production deployments on a Flink cluster (where the Flink runtime is already provided), these dependencies should be changed tocompileOnly
to significantly reduce the size of the final JAR.
- Flink Core & APIs:
- Application Configuration:
- The
application
block sets themainClass
and passes necessary JVM arguments for Flink’s runtime. Therun
task is configured with environment variables to specify Kafka and Schema Registry connection details.
- The
- Avro & Shadow JAR:
- The
avro
block configures code generation. - The
shadowJar
task configures the output JAR name and merges service files, which is crucial for Flink connectors to work correctly.
- The
1plugins {
2 kotlin("jvm") version "2.1.20"
3 id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
4 id("com.github.johnrengelman.shadow") version "8.1.1"
5 application
6}
7
8group = "me.jaehyeon"
9version = "1.0-SNAPSHOT"
10
11repositories {
12 mavenCentral()
13 maven("https://packages.confluent.io/maven")
14}
15
16dependencies {
17 // Flink Core and APIs
18 implementation("org.apache.flink:flink-streaming-java:1.20.1")
19 implementation("org.apache.flink:flink-table-api-java:1.20.1")
20 implementation("org.apache.flink:flink-table-api-java-bridge:1.20.1")
21 implementation("org.apache.flink:flink-table-planner-loader:1.20.1")
22 implementation("org.apache.flink:flink-table-runtime:1.20.1")
23 implementation("org.apache.flink:flink-clients:1.20.1")
24 implementation("org.apache.flink:flink-connector-base:1.20.1")
25 // Flink Kafka and Avro
26 implementation("org.apache.flink:flink-connector-kafka:3.4.0-1.20")
27 implementation("org.apache.flink:flink-avro:1.20.1")
28 implementation("org.apache.flink:flink-avro-confluent-registry:1.20.1")
29 // Json
30 implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.13.0")
31 // Logging
32 implementation("io.github.microutils:kotlin-logging-jvm:3.0.5")
33 implementation("ch.qos.logback:logback-classic:1.5.13")
34 // Kotlin test
35 testImplementation(kotlin("test"))
36}
37
38kotlin {
39 jvmToolchain(17)
40}
41
42application {
43 mainClass.set("me.jaehyeon.MainKt")
44 applicationDefaultJvmArgs =
45 listOf(
46 "--add-opens=java.base/java.util=ALL-UNNAMED",
47 )
48}
49
50avro {
51 setCreateSetters(true)
52 setFieldVisibility("PUBLIC")
53}
54
55tasks.named("compileKotlin") {
56 dependsOn("generateAvroJava")
57}
58
59sourceSets {
60 named("main") {
61 java.srcDirs("build/generated/avro/main")
62 kotlin.srcDirs("src/main/kotlin")
63 }
64}
65
66tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar> {
67 archiveBaseName.set("orders-stats-flink")
68 archiveClassifier.set("")
69 archiveVersion.set("1.0")
70 mergeServiceFiles()
71}
72
73tasks.named("build") {
74 dependsOn("shadowJar")
75}
76
77tasks.named<JavaExec>("run") {
78 environment("TO_SKIP_PRINT", "false")
79 environment("BOOTSTRAP", "localhost:9092")
80 environment("REGISTRY_URL", "http://localhost:8081")
81}
82
83tasks.test {
84 useJUnitPlatform()
85}
Avro Schema for Supplier Statistics
The SupplierStats.avsc
file defines the structure for the aggregated output data. This schema is used by the Flink Kafka sink to serialize the SupplierStats
objects into Avro format, ensuring type safety and enabling schema evolution for downstream consumers.
- Type: A
record
namedSupplierStats
in theme.jaehyeon.avro
namespace. - Fields:
window_start
andwindow_end
(string): The start and end times of the aggregation window.supplier
(string): The supplier being aggregated.total_price
(double): The sum of order prices within the window.count
(long): The total number of orders within the window.
1{
2 "type": "record",
3 "name": "SupplierStats",
4 "namespace": "me.jaehyeon.avro",
5 "fields": [
6 { "name": "window_start", "type": "string" },
7 { "name": "window_end", "type": "string" },
8 { "name": "supplier", "type": "string" },
9 { "name": "total_price", "type": "double" },
10 { "name": "count", "type": "long" }
11 ]
12}
Shared Utilities
These utility files provide common functionality used by both the DataStream and Table API applications.
Kafka Admin Utilities
This file provides two key helper functions for interacting with the Kafka ecosystem:
createTopicIfNotExists(...)
: Uses Kafka’sAdminClient
to programmatically create topics. It’s designed to be idempotent, safely handling cases where the topic already exists to prevent application startup failures.getLatestSchema(...)
: Connects to the Confluent Schema Registry usingCachedSchemaRegistryClient
to fetch the latest Avro schema for a given subject. This is essential for the Flink source to correctly deserialize incoming Avro records without hardcoding the schema in the application.
1package me.jaehyeon.kafka
2
3import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
4import mu.KotlinLogging
5import org.apache.avro.Schema
6import org.apache.kafka.clients.admin.AdminClient
7import org.apache.kafka.clients.admin.AdminClientConfig
8import org.apache.kafka.clients.admin.NewTopic
9import org.apache.kafka.common.errors.TopicExistsException
10import java.util.Properties
11import java.util.concurrent.ExecutionException
12import kotlin.use
13
14private val logger = KotlinLogging.logger { }
15
16fun createTopicIfNotExists(
17 topicName: String,
18 bootstrapAddress: String,
19 numPartitions: Int,
20 replicationFactor: Short,
21) {
22 val props =
23 Properties().apply {
24 put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
25 put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000")
26 put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "3000")
27 put(AdminClientConfig.RETRIES_CONFIG, "1")
28 }
29
30 AdminClient.create(props).use { client ->
31 val newTopic = NewTopic(topicName, numPartitions, replicationFactor)
32 val result = client.createTopics(listOf(newTopic))
33
34 try {
35 logger.info { "Attempting to create topic '$topicName'..." }
36 result.all().get()
37 logger.info { "Topic '$topicName' created successfully!" }
38 } catch (e: ExecutionException) {
39 if (e.cause is TopicExistsException) {
40 logger.warn { "Topic '$topicName' was created concurrently or already existed. Continuing..." }
41 } else {
42 throw RuntimeException("Unrecoverable error while creating a topic '$topicName'.", e)
43 }
44 }
45 }
46}
47
48fun getLatestSchema(
49 schemaSubject: String,
50 registryUrl: String,
51 registryConfig: Map<String, String>,
52): Schema {
53 val schemaRegistryClient =
54 CachedSchemaRegistryClient(
55 registryUrl,
56 100,
57 registryConfig,
58 )
59 logger.info { "Fetching latest schema for subject '$schemaSubject' from $registryUrl" }
60 try {
61 val latestSchemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(schemaSubject)
62 logger.info {
63 "Successfully fetched schema ID ${latestSchemaMetadata.id} version ${latestSchemaMetadata.version} for subject '$schemaSubject'"
64 }
65 return Schema.Parser().parse(latestSchemaMetadata.schema)
66 } catch (e: Exception) {
67 logger.error(e) { "Failed to retrieve schema for subject '$schemaSubject' from registry $registryUrl" }
68 throw RuntimeException("Failed to retrieve schema for subject '$schemaSubject'", e)
69 }
70}
Flink Kafka Connectors
This file centralizes the creation of Flink’s Kafka sources and sinks.
createOrdersSource(...)
: Configures aKafkaSource
to consumeGenericRecord
Avro data. It usesConfluentRegistryAvroDeserializationSchema
to automatically deserialize messages using the schema from Confluent Schema Registry.createStatsSink(...)
: Configures aKafkaSink
for the aggregatedSupplierStats
. It usesConfluentRegistryAvroSerializationSchema
to serialize the specificSupplierStats
type and sets the Kafka message key to the supplier’s name.createSkippedSink(...)
: Creates a genericKafkaSink
for late records, which are handled as simple key-value string pairs.
1package me.jaehyeon.kafka
2
3import me.jaehyeon.avro.SupplierStats
4import org.apache.avro.Schema
5import org.apache.avro.generic.GenericRecord
6import org.apache.flink.connector.base.DeliveryGuarantee
7import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
8import org.apache.flink.connector.kafka.sink.KafkaSink
9import org.apache.flink.connector.kafka.source.KafkaSource
10import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
11import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
12import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema
13import org.apache.kafka.clients.consumer.ConsumerConfig
14import org.apache.kafka.clients.producer.ProducerConfig
15import java.nio.charset.StandardCharsets
16import java.util.Properties
17
18fun createOrdersSource(
19 topic: String,
20 groupId: String,
21 bootstrapAddress: String,
22 registryUrl: String,
23 registryConfig: Map<String, String>,
24 schema: Schema,
25): KafkaSource<GenericRecord> =
26 KafkaSource
27 .builder<GenericRecord>()
28 .setBootstrapServers(bootstrapAddress)
29 .setTopics(topic)
30 .setGroupId(groupId)
31 .setStartingOffsets(OffsetsInitializer.earliest())
32 .setValueOnlyDeserializer(
33 ConfluentRegistryAvroDeserializationSchema.forGeneric(
34 schema,
35 registryUrl,
36 registryConfig,
37 ),
38 ).setProperties(
39 Properties().apply {
40 put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500")
41 },
42 ).build()
43
44fun createStatsSink(
45 topic: String,
46 bootstrapAddress: String,
47 registryUrl: String,
48 registryConfig: Map<String, String>,
49 outputSubject: String,
50): KafkaSink<SupplierStats> =
51 KafkaSink
52 .builder<SupplierStats>()
53 .setBootstrapServers(bootstrapAddress)
54 .setKafkaProducerConfig(
55 Properties().apply {
56 setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
57 setProperty(ProducerConfig.LINGER_MS_CONFIG, "100")
58 setProperty(ProducerConfig.BATCH_SIZE_CONFIG, (64 * 1024).toString())
59 setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
60 },
61 ).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
62 .setRecordSerializer(
63 KafkaRecordSerializationSchema
64 .builder<SupplierStats>()
65 .setTopic(topic)
66 .setKeySerializationSchema { value: SupplierStats ->
67 value.supplier.toByteArray(StandardCharsets.UTF_8)
68 }.setValueSerializationSchema(
69 ConfluentRegistryAvroSerializationSchema.forSpecific<SupplierStats>(
70 SupplierStats::class.java,
71 outputSubject,
72 registryUrl,
73 registryConfig,
74 ),
75 ).build(),
76 ).build()
77
78fun createSkippedSink(
79 topic: String,
80 bootstrapAddress: String,
81): KafkaSink<Pair<String?, String>> =
82 KafkaSink
83 .builder<Pair<String?, String>>()
84 .setBootstrapServers(bootstrapAddress)
85 .setKafkaProducerConfig(
86 Properties().apply {
87 setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
88 setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
89 setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
90 setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
91 },
92 ).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
93 .setRecordSerializer(
94 KafkaRecordSerializationSchema
95 .builder<Pair<String?, String>>()
96 .setTopic(topic)
97 .setKeySerializationSchema { pair: Pair<String?, String> ->
98 pair.first?.toByteArray(StandardCharsets.UTF_8)
99 }.setValueSerializationSchema { pair: Pair<String?, String> ->
100 pair.second.toByteArray(StandardCharsets.UTF_8)
101 }.build(),
102 ).build()
DataStream Processing Logic
The following files contain the core logic specific to the DataStream API implementation.
Timestamp and Watermark Strategy
For event-time processing, Flink needs to know each event’s timestamp and how to handle out-of-order data. This WatermarkStrategy
extracts the timestamp from the bid_time
field of the incoming RecordMap
. It uses forBoundedOutOfOrderness
with a 5-second duration, telling Flink to expect records to be at most 5 seconds late.
1package me.jaehyeon.flink.watermark
2
3import me.jaehyeon.flink.processing.RecordMap
4import mu.KotlinLogging
5import org.apache.flink.api.common.eventtime.WatermarkStrategy
6import java.time.Duration
7import java.time.LocalDateTime
8import java.time.ZoneId
9import java.time.format.DateTimeFormatter
10
11private val logger = KotlinLogging.logger {}
12
13object SupplierWatermarkStrategy {
14 val strategy: WatermarkStrategy<RecordMap> =
15 WatermarkStrategy
16 .forBoundedOutOfOrderness<RecordMap>(Duration.ofSeconds(5)) // Operates on RecordMap
17 .withTimestampAssigner { recordMap, _ ->
18 val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
19 try {
20 val bidTimeString = recordMap["bid_time"]?.toString()
21 if (bidTimeString != null) {
22 val ldt = LocalDateTime.parse(bidTimeString, formatter)
23 ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
24 } else {
25 logger.warn { "Missing 'bid_time' field in RecordMap: $recordMap. Using processing time." }
26 System.currentTimeMillis()
27 }
28 } catch (e: Exception) {
29 logger.error(e) { "Error parsing 'bid_time' from RecordMap: $recordMap. Using processing time." }
30 System.currentTimeMillis()
31 }
32 }.withIdleness(Duration.ofSeconds(10)) // Optional: if partitions can be idle
33}
Custom Aggregation and Windowing Functions
Flink’s DataStream API provides fine-grained control over windowed aggregations using a combination of an AggregateFunction
and a WindowFunction
.
SupplierStatsAggregator
: ThisAggregateFunction
performs efficient, incremental aggregation. For each record in a window, it updates an accumulator, adding the price tototalPrice
and incrementing thecount
. This pre-aggregation is highly optimized as it doesn’t need to store all records in the window.SupplierStatsFunction
: ThisWindowFunction
is applied once the window is complete. It receives the final accumulator from theAggregateFunction
and has access to the window’s metadata (key, start time, end time). It uses this information to construct the finalSupplierStats
Avro object.
1package me.jaehyeon.flink.processing
2
3import org.apache.flink.api.common.functions.AggregateFunction
4
5typealias RecordMap = Map<String, Any?>
6
7data class SupplierStatsAccumulator(
8 var totalPrice: Double = 0.0,
9 var count: Long = 0L,
10)
11
12class SupplierStatsAggregator : AggregateFunction<RecordMap, SupplierStatsAccumulator, SupplierStatsAccumulator> {
13 override fun createAccumulator(): SupplierStatsAccumulator = SupplierStatsAccumulator()
14
15 override fun add(
16 value: RecordMap,
17 accumulator: SupplierStatsAccumulator,
18 ): SupplierStatsAccumulator =
19 SupplierStatsAccumulator(
20 accumulator.totalPrice + value["price"] as Double,
21 accumulator.count + 1,
22 )
23
24 override fun getResult(accumulator: SupplierStatsAccumulator): SupplierStatsAccumulator = accumulator
25
26 override fun merge(
27 a: SupplierStatsAccumulator,
28 b: SupplierStatsAccumulator,
29 ): SupplierStatsAccumulator =
30 SupplierStatsAccumulator(
31 totalPrice = a.totalPrice + b.totalPrice,
32 count = a.count + b.count,
33 )
34}
1package me.jaehyeon.flink.processing
2
3import me.jaehyeon.avro.SupplierStats
4import org.apache.flink.streaming.api.functions.windowing.WindowFunction
5import org.apache.flink.streaming.api.windowing.windows.TimeWindow
6import org.apache.flink.util.Collector
7import java.time.Instant
8import java.time.ZoneId
9import java.time.format.DateTimeFormatter
10
11class SupplierStatsFunction : WindowFunction<SupplierStatsAccumulator, SupplierStats, String, TimeWindow> {
12 companion object {
13 private val formatter: DateTimeFormatter =
14 DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault())
15 }
16
17 override fun apply(
18 supplierKey: String,
19 window: TimeWindow,
20 input: Iterable<SupplierStatsAccumulator>,
21 out: Collector<SupplierStats>,
22 ) {
23 val accumulator = input.firstOrNull() ?: return
24 val windowStartStr = formatter.format(Instant.ofEpochMilli(window.start))
25 val windowEndStr = formatter.format(Instant.ofEpochMilli(window.end))
26
27 out.collect(
28 SupplierStats
29 .newBuilder()
30 .setWindowStart(windowStartStr)
31 .setWindowEnd(windowEndStr)
32 .setSupplier(supplierKey)
33 .setTotalPrice(String.format("%.2f", accumulator.totalPrice).toDouble())
34 .setCount(accumulator.count)
35 .build(),
36 )
37 }
38}
Not Applicable Source Code
RowWatermarkStrategy
and LateDataRouter
are used exclusively by the Flink Table API application and are not relevant to this DataStream implementation. The DataStream API handles late data using the built-in .sideOutputLateData()
method, making a custom router unnecessary.
Core DataStream Application
This is the main driver for the DataStream application. It defines and executes the Flink job topology.
- Environment Setup: It initializes the
StreamExecutionEnvironment
and creates the necessary output Kafka topics. - Source and Transformation: It creates a Kafka source for Avro
GenericRecord
s and then maps them to a more convenientDataStream<RecordMap>
. - Timestamping and Windowing:
assignTimestampsAndWatermarks
applies the customSupplierWatermarkStrategy
.- The stream is keyed by the
supplier
field. - A
TumblingEventTimeWindows
of 5 seconds is defined. allowedLateness
is set to 5 seconds, allowing the window state to be kept for an additional 5 seconds after the watermark passes to accommodate late-but-not-too-late events.
- Late Data Handling:
sideOutputLateData
is a key feature. It directs any records arriving after theallowedLateness
period to a separate stream identified by anOutputTag
. - Aggregation: The
.aggregate()
call combines the efficientSupplierStatsAggregator
with the finalSupplierStatsFunction
to produce the statistics. - Sinking:
- The main
statsStream
is sent to thestatsSink
. - The late data stream, retrieved via
getSideOutput
, is processed (converted to JSON with a “late” flag) and sent to theskippedSink
.
- The main
- Execution:
env.execute()
starts the Flink job.
1package me.jaehyeon
2
3import com.fasterxml.jackson.databind.ObjectMapper
4import com.fasterxml.jackson.module.kotlin.registerKotlinModule
5import me.jaehyeon.avro.SupplierStats
6import me.jaehyeon.flink.processing.RecordMap
7import me.jaehyeon.flink.processing.SupplierStatsAggregator
8import me.jaehyeon.flink.processing.SupplierStatsFunction
9import me.jaehyeon.flink.watermark.SupplierWatermarkStrategy
10import me.jaehyeon.kafka.createOrdersSource
11import me.jaehyeon.kafka.createSkippedSink
12import me.jaehyeon.kafka.createStatsSink
13import me.jaehyeon.kafka.createTopicIfNotExists
14import me.jaehyeon.kafka.getLatestSchema
15import mu.KotlinLogging
16import org.apache.avro.generic.GenericRecord
17import org.apache.flink.api.common.eventtime.WatermarkStrategy
18import org.apache.flink.api.common.typeinfo.TypeHint
19import org.apache.flink.api.common.typeinfo.TypeInformation
20import org.apache.flink.streaming.api.datastream.DataStream
21import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
22import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
23import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
24import org.apache.flink.util.OutputTag
25import java.time.Duration
26
27object DataStreamApp {
28 private val toSkipPrint = System.getenv("TO_SKIP_PRINT")?.toBoolean() ?: true
29 private val bootstrapAddress = System.getenv("BOOTSTRAP") ?: "kafka-1:19092"
30 private val inputTopicName = System.getenv("TOPIC") ?: "orders-avro"
31 private val registryUrl = System.getenv("REGISTRY_URL") ?: "http://schema:8081"
32 private val registryConfig =
33 mapOf(
34 "basic.auth.credentials.source" to "USER_INFO",
35 "basic.auth.user.info" to "admin:admin",
36 )
37 private const val INPUT_SCHEMA_SUBJECT = "orders-avro-value"
38 private const val NUM_PARTITIONS = 3
39 private const val REPLICATION_FACTOR: Short = 3
40 private val logger = KotlinLogging.logger {}
41
42 // ObjectMapper for converting late data Map to JSON
43 private val objectMapper: ObjectMapper by lazy {
44 ObjectMapper().registerKotlinModule()
45 }
46
47 fun run() {
48 // Create output topics if not existing
49 val outputTopicName = "$inputTopicName-kds-stats"
50 val skippedTopicName = "$inputTopicName-kds-skipped"
51 listOf(outputTopicName, skippedTopicName).forEach { name ->
52 createTopicIfNotExists(
53 name,
54 bootstrapAddress,
55 NUM_PARTITIONS,
56 REPLICATION_FACTOR,
57 )
58 }
59
60 val env = StreamExecutionEnvironment.getExecutionEnvironment()
61 env.parallelism = 3
62
63 val inputAvroSchema = getLatestSchema(INPUT_SCHEMA_SUBJECT, registryUrl, registryConfig)
64 val ordersGenericRecordSource =
65 createOrdersSource(
66 topic = inputTopicName,
67 groupId = "$inputTopicName-flink-ds",
68 bootstrapAddress = bootstrapAddress,
69 registryUrl = registryUrl,
70 registryConfig = registryConfig,
71 schema = inputAvroSchema,
72 )
73
74 // 1. Stream of GenericRecords from Kafka
75 val genericRecordStream: DataStream<GenericRecord> =
76 env
77 .fromSource(ordersGenericRecordSource, WatermarkStrategy.noWatermarks(), "KafkaGenericRecordSource")
78
79 // 2. Convert GenericRecord to Map<String, Any?> (RecordMap)
80 val recordMapStream: DataStream<RecordMap> =
81 genericRecordStream
82 .map { genericRecord ->
83 val map = mutableMapOf<String, Any?>()
84 genericRecord.schema.fields.forEach { field ->
85 val value = genericRecord.get(field.name())
86 map[field.name()] = if (value is org.apache.avro.util.Utf8) value.toString() else value
87 }
88 map as RecordMap // Cast to type alias
89 }.name("GenericRecordToMapConverter")
90 .returns(TypeInformation.of(object : TypeHint<RecordMap>() {}))
91
92 // 3. Define OutputTag for late data (now carrying RecordMap)
93 val lateMapOutputTag =
94 OutputTag(
95 "late-order-records",
96 TypeInformation.of(object : TypeHint<RecordMap>() {}),
97 )
98
99 // 4. Process the RecordMap stream
100 val statsStreamOperator: SingleOutputStreamOperator<SupplierStats> =
101 recordMapStream
102 .assignTimestampsAndWatermarks(SupplierWatermarkStrategy.strategy)
103 .keyBy { recordMap -> recordMap["supplier"].toString() }
104 .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
105 .allowedLateness(Duration.ofSeconds(5))
106 .sideOutputLateData(lateMapOutputTag)
107 .aggregate(SupplierStatsAggregator(), SupplierStatsFunction())
108 val statsStream: DataStream<SupplierStats> = statsStreamOperator
109
110 // 5. Handle late data as a pair of key and value
111 val lateDataMapStream: DataStream<RecordMap> = statsStreamOperator.getSideOutput(lateMapOutputTag)
112 val lateKeyPairStream: DataStream<Pair<String?, String>> =
113 lateDataMapStream
114 .map { recordMap ->
115 val mutableMap = recordMap.toMutableMap()
116 mutableMap["late"] = true
117 val orderId = mutableMap["order_id"] as? String
118 try {
119 val value = objectMapper.writeValueAsString(mutableMap)
120 Pair(orderId, value)
121 } catch (e: Exception) {
122 logger.error(e) { "Error serializing late RecordMap to JSON: $mutableMap" }
123 val errorJson = "{ \"error\": \"json_serialization_failed\", \"data_keys\": \"${
124 mutableMap.keys.joinToString(
125 ",",
126 )}\" }"
127 Pair(orderId, errorJson)
128 }
129 }.returns(TypeInformation.of(object : TypeHint<Pair<String?, String>>() {}))
130
131 if (!toSkipPrint) {
132 statsStream
133 .print()
134 .name("SupplierStatsPrint")
135 lateKeyPairStream
136 .map { it.second }
137 .print()
138 .name("LateDataPrint")
139 }
140
141 val statsSink =
142 createStatsSink(
143 topic = outputTopicName,
144 bootstrapAddress = bootstrapAddress,
145 registryUrl = registryUrl,
146 registryConfig = registryConfig,
147 outputSubject = "$outputTopicName-value",
148 )
149
150 val skippedSink =
151 createSkippedSink(
152 topic = skippedTopicName,
153 bootstrapAddress = bootstrapAddress,
154 )
155
156 statsStream.sinkTo(statsSink).name("SupplierStatsSink")
157 lateKeyPairStream.sinkTo(skippedSink).name("LateDataSink")
158 env.execute("SupplierStats")
159 }
160}
Application Entry Point
The Main.kt
file serves as the entry point for the application. It parses a command-line argument (datastream
or table
) to determine which Flink application to run. A try-catch
block ensures that any fatal error during execution is logged before the application exits.
1package me.jaehyeon
2
3import mu.KotlinLogging
4import kotlin.system.exitProcess
5
6private val logger = KotlinLogging.logger {}
7
8fun main(args: Array<String>) {
9 try {
10 when (args.getOrNull(0)?.lowercase()) {
11 "datastream" -> DataStreamApp.run()
12 "table" -> TableApp.run()
13 else -> println("Usage: <datastream | table>")
14 }
15 } catch (e: Exception) {
16 logger.error(e) { "Fatal error in ${args.getOrNull(0) ?: "app"}. Shutting down." }
17 exitProcess(1)
18 }
19}
Run Flink Application
To observe our Flink DataStream application in action, we’ll follow the essential steps: setting up a local Kafka environment, generating a stream of test data, and then executing the Flink job.
Factor House Local Setup
A local Kafka environment is a prerequisite. If you don’t have one running, use the Factor House Local project to quickly get started:
- Clone the repository:
1git clone https://github.com/factorhouse/factorhouse-local.git 2cd factorhouse-local
- Configure your Kpow community license as detailed in the project’s README.
- Start the Docker services:
1docker compose -f compose-kpow-community.yml up -d
Once running, the Kpow UI at http://localhost:3000
will provide visibility into your Kafka cluster.
Start the Kafka Order Producer
Our Flink application is designed to consume order data from the orders-avro
topic. We’ll use the Kafka producer developed in Part 2 of this series to generate this data. To properly test Flink’s event-time windowing, we’ll configure the producer to add a randomized delay (up to 30 seconds) to the bid_time
field.
Navigate to the directory of the producer application (orders-avro-clients from the GitHub repository) and run:
1# Assuming you are in the root of the 'orders-avro-clients' project
2DELAY_SECONDS=30 ./gradlew run --args="producer"
This will start populating the orders-avro
topic with Avro-encoded order messages. You can inspect these messages in Kpow. Ensure Kpow is configured with Key Deserializer: String, Value Deserializer: AVRO, and Schema Registry: Local Schema Registry.
Launch the Flink Application
With a steady stream of order events being produced, we can now launch our orders-stats-flink
application. Navigate to its project directory. The application’s entry point is designed to run different jobs based on a command-line argument; for this post, we’ll use datastream
.
The application can be run in two main ways:
- With Gradle (Development Mode):
1./gradlew run --args="datastream"
- Running the Shadow JAR (Deployment Mode):
1# First, build the fat JAR 2./gradlew shadowJar 3 4# Then run it 5java --add-opens=java.base/java.util=ALL-UNNAMED \ 6 -jar build/libs/orders-stats-flink-1.0.jar datastream
💡 To build and run the application locally, ensure that JDK 17 and a recent version of Gradle (e.g., 7.6+ or 8.x) are installed.
For this demonstration, we’ll use Gradle to run the application in development mode. Upon starting, you’ll see logs indicating the Flink application has initialized and is processing records from the orders-avro
topic.
Observing the Output
Our Flink DataStream job writes its results to two distinct Kafka topics:
orders-avro-kds-stats
: Contains the aggregated supplier statistics as Avro records.orders-avro-kds-skipped
: Contains records identified as “late,” serialized as JSON.
1. Supplier Statistics (orders-avro-kds-stats
):
In Kpow, navigate to the orders-avro-kds-stats
topic. Configure Kpow to view these messages:
- Key Deserializer: String
- Value Deserializer: AVRO
- Schema Registry: Local Schema Registry
You should see SupplierStats
messages, each representing the total price and count of orders for a supplier within a 5-second window. Notice the window_start
and window_end
fields.
2. Skipped (Late) Records (orders-avro-kds-skipped
):
Next, inspect the orders-avro-kds-skipped
topic in Kpow. Configure Kpow as follows:
- Key Deserializer: String
- Value Deserializer: JSON
These records are the ones that arrived too late to be included in their windows, even after the allowedLateness
period. They were captured using Flink’s powerful .sideOutputLateData()
function and then converted to JSON with a "late": true
field for confirmation.
Conclusion
In this post, we’ve built a powerful, real-time analytics job using Apache Flink’s DataStream API. We demonstrated how to implement a complete stateful pipeline in Kotlin, from consuming Avro records to performing windowed aggregations with a custom AggregateFunction
and WindowFunction
. We saw how Flink’s WatermarkStrategy
provides a robust foundation for event-time processing and how the .sideOutputLateData()
operator offers a clean, first-class solution for isolating late records. This approach showcases the fine-grained control and high performance the DataStream API offers for complex stream processing challenges. Next, we will see how to solve the same problem with a much more declarative approach using Flink’s Table API.
Comments