The Flink SQL Cookbook by Ververica is a hands-on, example-rich guide to mastering Apache Flink SQL for real-time stream processing. It offers a wide range of self-contained recipes, from basic queries and table operations to more advanced use cases like windowed aggregations, complex joins, user-defined functions (UDFs), and pattern detection. These examples are designed to be run on the Ververica Platform, and as such, the cookbook doesn’t include instructions for setting up a Flink cluster.
To help you run these recipes locally and explore Flink SQL without external dependencies, this post walks through setting up a fully functional local Flink cluster using Docker Compose. With this setup, you can experiment with the cookbook examples right on your machine.
Flink Cluster on Docker
The cookbook generates sample data using the Flink SQL Faker Connector, which allows for realistic, randomized record generation. To streamline the setup, we use a custom Docker image where the connector’s JAR file is downloaded into the /opt/flink/lib/
directory. This approach eliminates the need to manually register the connector each time we launch the Flink SQL client, making it easier to jump straight into experimenting with the cookbook’s examples. The source for this post is available in this GitHub repository.
1FROM flink:1.20.1
2
3# add faker connector
4RUN wget -P /opt/flink/lib/ \
5 https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar
We deploy a local Apache Flink cluster using Docker Compose. It defines one JobManager
and three TaskManagers
, all using the custom image. The JobManager
handles coordination and exposes the Flink web UI on port 8081, while each TaskManager
provides 10 task slots for parallel processing. All components share a custom network and use a filesystem-based state backend with checkpointing and savepoint directories configured for local testing. A health check ensures the JobManager
is ready before TaskManagers
start.
1version: "3"
2
3services:
4 jobmanager:
5 image: flink-sql-cookbook
6 build: .
7 command: jobmanager
8 container_name: jobmanager
9 ports:
10 - "8081:8081"
11 networks:
12 - cookbook
13 environment:
14 - |
15 FLINK_PROPERTIES=
16 jobmanager.rpc.address: jobmanager
17 state.backend: filesystem
18 state.checkpoints.dir: file:///tmp/flink-checkpoints
19 state.savepoints.dir: file:///tmp/flink-savepoints
20 heartbeat.interval: 1000
21 heartbeat.timeout: 5000
22 rest.flamegraph.enabled: true
23 web.backpressure.refresh-interval: 10000
24 healthcheck:
25 test: ["CMD", "curl", "-f", "http://localhost:8081/config"]
26 interval: 5s
27 timeout: 5s
28 retries: 5
29
30 taskmanager-1:
31 image: flink-sql-cookbook
32 build: .
33 command: taskmanager
34 container_name: taskmanager-1
35 networks:
36 - cookbook
37 depends_on:
38 jobmanager:
39 condition: service_healthy
40 environment:
41 - |
42 FLINK_PROPERTIES=
43 jobmanager.rpc.address: jobmanager
44 taskmanager.numberOfTaskSlots: 10
45 state.backend: filesystem
46 state.checkpoints.dir: file:///tmp/flink-checkpoints
47 state.savepoints.dir: file:///tmp/flink-savepoints
48 heartbeat.interval: 1000
49 heartbeat.timeout: 5000
50
51 taskmanager-2:
52 image: flink-sql-cookbook
53 build: .
54 command: taskmanager
55 container_name: taskmanager-2
56 networks:
57 - cookbook
58 depends_on:
59 jobmanager:
60 condition: service_healthy
61 environment:
62 - |
63 FLINK_PROPERTIES=
64 jobmanager.rpc.address: jobmanager
65 taskmanager.numberOfTaskSlots: 10
66 state.backend: filesystem
67 state.checkpoints.dir: file:///tmp/flink-checkpoints
68 state.savepoints.dir: file:///tmp/flink-savepoints
69 heartbeat.interval: 1000
70 heartbeat.timeout: 5000
71
72 taskmanager-3:
73 image: flink-sql-cookbook
74 build: .
75 command: taskmanager
76 container_name: taskmanager-3
77 networks:
78 - cookbook
79 depends_on:
80 jobmanager:
81 condition: service_healthy
82 environment:
83 - |
84 FLINK_PROPERTIES=
85 jobmanager.rpc.address: jobmanager
86 taskmanager.numberOfTaskSlots: 10
87 state.backend: filesystem
88 state.checkpoints.dir: file:///tmp/flink-checkpoints
89 state.savepoints.dir: file:///tmp/flink-savepoints
90 heartbeat.interval: 1000
91 heartbeat.timeout: 5000
92
93networks:
94 cookbook:
95 name: flink-sql-cookbook
The Flink cluster can be deployed as follows.
1# start containers
2$ docker compose up -d
3
4# list containers
5$ docker-compose ps
6# NAME COMMAND SERVICE STATUS PORTS
7# jobmanager "/docker-entrypoint.…" jobmanager running (healthy) 6123/tcp, 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp
8# taskmanager-1 "/docker-entrypoint.…" taskmanager-1 running 6123/tcp, 8081/tcp
9# taskmanager-2 "/docker-entrypoint.…" taskmanager-2 running 6123/tcp, 8081/tcp
10# taskmanager-3 "/docker-entrypoint.…" taskmanager-3 running 6123/tcp, 8081/tcp
Flink SQL Client
We can start the SQL client from the JobManager
container as shown below.
1$ docker exec -it jobmanager /opt/flink/bin/sql-client.sh
On the SQL shell, we can execute Flink SQL statements.
1-- // create a temporary table
2CREATE TEMPORARY TABLE heros (
3 `name` STRING,
4 `power` STRING,
5 `age` INT
6) WITH (
7 'connector' = 'faker',
8 'fields.name.expression' = '#{superhero.name}',
9 'fields.power.expression' = '#{superhero.power}',
10 'fields.power.null-rate' = '0.05',
11 'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
12);
13-- [INFO] Execute statement succeeded.
14
15-- list tables
16SHOW TABLES;
17-- +------------+
18-- | table name |
19-- +------------+
20-- | heros |
21-- +------------+
22-- 1 row in set
23
24-- query records from the heros table
25-- hit 'q' to exit the record view
26SELECT * FROM heros;
27
28-- quit sql shell
29quit;
The associating Flink job of the SELECT query can be found on the Flink Web UI at http://localhost:8081
.
Caveat
Some examples in the cookbook rely on an older version of the Faker connector, and as a result, certain directives used in the queries are no longer supported in the latest version—leading to runtime errors. For instance, the following query fails because the #{Internet.userAgentAny}
directive has been removed. To resolve this, you can either remove the user_agent
field from the query or replace the outdated directive with a supported one, such as using regexify
to generate similar values.
1CREATE TABLE server_logs (
2 client_ip STRING,
3 client_identity STRING,
4 userid STRING,
5 user_agent STRING,
6 log_time TIMESTAMP(3),
7 request_line STRING,
8 status_code STRING,
9 size INT
10) WITH (
11 'connector' = 'faker',
12 'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
13 'fields.client_identity.expression' = '-',
14 'fields.userid.expression' = '-',
15 'fields.user_agent.expression' = '#{Internet.userAgentAny}',
16 'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
17 'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
18 'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
19 'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
20);
Comments