The Flink SQL Cookbook by Ververica is a hands-on, example-rich guide to mastering Apache Flink SQL for real-time stream processing. It offers a wide range of self-contained recipes, from basic queries and table operations to more advanced use cases like windowed aggregations, complex joins, user-defined functions (UDFs), and pattern detection. These examples are designed to be run on the Ververica Platform, and as such, the cookbook doesn’t include instructions for setting up a Flink cluster.

To help you run these recipes locally and explore Flink SQL without external dependencies, this post walks through setting up a fully functional local Flink cluster using Docker Compose. With this setup, you can experiment with the cookbook examples right on your machine.

The cookbook generates sample data using the Flink SQL Faker Connector, which allows for realistic, randomized record generation. To streamline the setup, we use a custom Docker image where the connector’s JAR file is downloaded into the /opt/flink/lib/ directory. This approach eliminates the need to manually register the connector each time we launch the Flink SQL client, making it easier to jump straight into experimenting with the cookbook’s examples. The source for this post is available in this GitHub repository.

1FROM flink:1.20.1
2
3# add faker connector
4RUN wget -P /opt/flink/lib/ \
5  https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar

We deploy a local Apache Flink cluster using Docker Compose. It defines one JobManager and three TaskManagers, all using the custom image. The JobManager handles coordination and exposes the Flink web UI on port 8081, while each TaskManager provides 10 task slots for parallel processing. All components share a custom network and use a filesystem-based state backend with checkpointing and savepoint directories configured for local testing. A health check ensures the JobManager is ready before TaskManagers start.

 1version: "3"
 2
 3services:
 4  jobmanager:
 5    image: flink-sql-cookbook
 6    build: .
 7    command: jobmanager
 8    container_name: jobmanager
 9    ports:
10      - "8081:8081"
11    networks:
12      - cookbook
13    environment:
14      - |
15        FLINK_PROPERTIES=
16        jobmanager.rpc.address: jobmanager
17        state.backend: filesystem
18        state.checkpoints.dir: file:///tmp/flink-checkpoints
19        state.savepoints.dir: file:///tmp/flink-savepoints
20        heartbeat.interval: 1000
21        heartbeat.timeout: 5000
22        rest.flamegraph.enabled: true
23        web.backpressure.refresh-interval: 10000        
24    healthcheck:
25      test: ["CMD", "curl", "-f", "http://localhost:8081/config"]
26      interval: 5s
27      timeout: 5s
28      retries: 5
29
30  taskmanager-1:
31    image: flink-sql-cookbook
32    build: .
33    command: taskmanager
34    container_name: taskmanager-1
35    networks:
36      - cookbook
37    depends_on:
38      jobmanager:
39        condition: service_healthy
40    environment:
41      - |
42        FLINK_PROPERTIES=
43        jobmanager.rpc.address: jobmanager
44        taskmanager.numberOfTaskSlots: 10
45        state.backend: filesystem
46        state.checkpoints.dir: file:///tmp/flink-checkpoints
47        state.savepoints.dir: file:///tmp/flink-savepoints
48        heartbeat.interval: 1000
49        heartbeat.timeout: 5000        
50
51  taskmanager-2:
52    image: flink-sql-cookbook
53    build: .
54    command: taskmanager
55    container_name: taskmanager-2
56    networks:
57      - cookbook
58    depends_on:
59      jobmanager:
60        condition: service_healthy
61    environment:
62      - |
63        FLINK_PROPERTIES=
64        jobmanager.rpc.address: jobmanager
65        taskmanager.numberOfTaskSlots: 10
66        state.backend: filesystem
67        state.checkpoints.dir: file:///tmp/flink-checkpoints
68        state.savepoints.dir: file:///tmp/flink-savepoints
69        heartbeat.interval: 1000
70        heartbeat.timeout: 5000        
71
72  taskmanager-3:
73    image: flink-sql-cookbook
74    build: .
75    command: taskmanager
76    container_name: taskmanager-3
77    networks:
78      - cookbook
79    depends_on:
80      jobmanager:
81        condition: service_healthy
82    environment:
83      - |
84        FLINK_PROPERTIES=
85        jobmanager.rpc.address: jobmanager
86        taskmanager.numberOfTaskSlots: 10
87        state.backend: filesystem
88        state.checkpoints.dir: file:///tmp/flink-checkpoints
89        state.savepoints.dir: file:///tmp/flink-savepoints
90        heartbeat.interval: 1000
91        heartbeat.timeout: 5000        
92
93networks:
94  cookbook:
95    name: flink-sql-cookbook

The Flink cluster can be deployed as follows.

 1# start containers
 2$ docker compose up -d
 3
 4# list containers
 5$ docker-compose ps
 6# NAME                COMMAND                  SERVICE             STATUS              PORTS
 7# jobmanager          "/docker-entrypoint.…"   jobmanager          running (healthy)   6123/tcp, 0.0.0.0:8081->8081/tcp, :::8081->8081/tcp
 8# taskmanager-1       "/docker-entrypoint.…"   taskmanager-1       running             6123/tcp, 8081/tcp
 9# taskmanager-2       "/docker-entrypoint.…"   taskmanager-2       running             6123/tcp, 8081/tcp
10# taskmanager-3       "/docker-entrypoint.…"   taskmanager-3       running             6123/tcp, 8081/tcp

We can start the SQL client from the JobManager container as shown below.

1$ docker exec -it jobmanager /opt/flink/bin/sql-client.sh

On the SQL shell, we can execute Flink SQL statements.

 1-- // create a temporary table
 2CREATE TEMPORARY TABLE heros (
 3  `name` STRING,
 4  `power` STRING,
 5  `age` INT
 6) WITH (
 7  'connector' = 'faker',
 8  'fields.name.expression' = '#{superhero.name}',
 9  'fields.power.expression' = '#{superhero.power}',
10  'fields.power.null-rate' = '0.05',
11  'fields.age.expression' = '#{number.numberBetween ''0'',''1000''}'
12);
13-- [INFO] Execute statement succeeded.
14
15-- list tables
16SHOW TABLES;
17-- +------------+
18-- | table name |
19-- +------------+
20-- |      heros |
21-- +------------+
22-- 1 row in set
23
24-- query records from the heros table
25-- hit 'q' to exit the record view
26SELECT * FROM heros;
27
28-- quit sql shell
29quit;

The associating Flink job of the SELECT query can be found on the Flink Web UI at http://localhost:8081.

Caveat

Some examples in the cookbook rely on an older version of the Faker connector, and as a result, certain directives used in the queries are no longer supported in the latest version—leading to runtime errors. For instance, the following query fails because the #{Internet.userAgentAny} directive has been removed. To resolve this, you can either remove the user_agent field from the query or replace the outdated directive with a supported one, such as using regexify to generate similar values.

 1CREATE TABLE server_logs ( 
 2    client_ip STRING,
 3    client_identity STRING, 
 4    userid STRING, 
 5    user_agent STRING,
 6    log_time TIMESTAMP(3),
 7    request_line STRING, 
 8    status_code STRING, 
 9    size INT
10) WITH (
11  'connector' = 'faker', 
12  'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
13  'fields.client_identity.expression' =  '-',
14  'fields.userid.expression' =  '-',
15  'fields.user_agent.expression' = '#{Internet.userAgentAny}',
16  'fields.log_time.expression' =  '#{date.past ''15'',''5'',''SECONDS''}',
17  'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
18  'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
19  'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
20);