Apache Flink is an open-source, unified stream-processing and batch-processing framework. Its core is a distributed streaming data-flow engine that you can use to run real-time stream processing on high-throughput data sources. Currently, it is widely used to build applications for fraud/anomaly detection, rule-based alerting, business process monitoring, and continuous ETL to name a few. On AWS, we can deploy a Flink application via Amazon Kinesis Data Analytics (KDA), Amazon EMR and Amazon EKS. Among those, KDA is the easiest option as it provides the underlying infrastructure for your Apache Flink applications.

For those who are new to Flink (Pyflink) and KDA, AWS provides a good resource that guides how to develop a Flink application locally and deploy via KDA. The guide uses Amazon Kinesis Data Stream as a data source and demonstrates how to sink records into multiple destinations - Kinesis Data Stream, Kinesis Firehose Delivery Stream and S3. It can be found in this GitHub project.

In this series of posts, we will update one of the examples of the guide by changing the data source and sink into Apache Kafka topics. In part 1, we will discuss how to develop a Flink application that targets a local Kafka cluster. Furthermore, it will be executed in a virtual environment as well as in a local Flink cluster for improved monitoring. The Flink application will be amended to connect a Kafka cluster on Amazon MSK in part 2. The Kafka cluster will be authenticated by IAM access control, and the Flink app needs to change its configuration accordingly by creating a custom Uber Jar file. In part 3, the application will be deployed via KDA using an application package that is saved in S3. The application package is a zip file that includes the application script, custom Uber Jar file, and 3rd-party Python packages. The deployment will be made by Terraform.

[Update 2023-08-30] Amazon Kinesis Data Analytics is renamed into Amazon Managed Service for Apache Flink. In this post, Kinesis Data Analytics (KDA) and Amazon Managed Service for Apache Flink will be used interchangeably.

Architecture

The Python source data generator (producer.py) sends random stock price records into a Kafka topic. The messages in the source topic are consumed by a Flink application, and it just writes those messages into a different sink topic. This is the simplest application of the AWS guide, and you may try other examples if interested.

Infrastructure

A Kafka cluster and management app (Kpow) are created using Docker while the Python apps including the Flink app run in a virtual environment in the first trial. After that the Flink app is submitted to a local Flink cluster for improved monitoring. The Flink cluster is also created using Docker. The source can be found in the GitHub repository of this post.

Preparation

As discussed later, the Flink application needs the Apache Kafka SQL Connector artifact (flink-sql-connector-kafka-1.15.2.jar) in order to connect a Kafka cluster. Also, the kafka-python package is downloaded to check if --pyFiles option works when submitting the app to a Flink cluster or deploying via KDA. They can be downloaded by executing the following script.

 1# build.sh
 2#!/usr/bin/env bash
 3SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
 4SRC_PATH=$SCRIPT_DIR/package
 5rm -rf $SRC_PATH && mkdir -p $SRC_PATH/lib
 6
 7## Download flink sql connector kafka
 8echo "download flink sql connector kafka..."
 9VERSION=1.15.2
10FILE_NAME=flink-sql-connector-kafka-$VERSION
11DOWNLOAD_URL=https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/$VERSION/flink-sql-connector-kafka-$VERSION.jar
12curl -L -o $SRC_PATH/lib/$FILE_NAME.jar ${DOWNLOAD_URL}
13
14## Install pip packages
15echo "install and zip pip packages..."
16pip install -r requirements.txt --target $SRC_PATH/site_packages
17
18## Package pyflink app
19echo "package pyflink app"
20zip -r kda-package.zip processor.py package/lib package/site_packages

Once downloaded, the Kafka SQL artifact and python package can be found in the lib and site_packages folders respectively as shown below.

Kafka Cluster

A Kafka cluster with a single broker and zookeeper node is used in this post. The broker has two listeners and the port 9092 and 29092 are used for internal and external communication respectively. The default number of topic partitions is set to 2. Details about Kafka cluster setup can be found in this post.

The Kpow CE is used for ease of monitoring Kafka topics and related resources. The bootstrap server address is added as an environment variable. See this post for details about Kafka management apps.

The Kafka cluster can be started by docker-compose -f compose-kafka.yml up -d.

 1# compose-kafka.yml
 2version: "3.5"
 3
 4services:
 5  zookeeper:
 6    image: bitnami/zookeeper:3.5
 7    container_name: zookeeper
 8    ports:
 9      - "2181"
10    networks:
11      - kafkanet
12    environment:
13      - ALLOW_ANONYMOUS_LOGIN=yes
14    volumes:
15      - zookeeper_data:/bitnami/zookeeper
16  kafka-0:
17    image: bitnami/kafka:2.8.1
18    container_name: kafka-0
19    expose:
20      - 9092
21    ports:
22      - "29092:29092"
23    networks:
24      - kafkanet
25    environment:
26      - ALLOW_PLAINTEXT_LISTENER=yes
27      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
28      - KAFKA_CFG_BROKER_ID=0
29      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
30      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29092
31      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,EXTERNAL://localhost:29092
32      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
33      - KAFKA_CFG_NUM_PARTITIONS=2
34    volumes:
35      - kafka_0_data:/bitnami/kafka
36    depends_on:
37      - zookeeper
38  kpow:
39    image: factorhouse/kpow-ce:91.2.1
40    container_name: kpow
41    ports:
42      - "3000:3000"
43    networks:
44      - kafkanet
45    environment:
46      BOOTSTRAP: kafka-0:9092
47    depends_on:
48      - zookeeper
49      - kafka-0
50
51networks:
52  kafkanet:
53    name: kafka-network
54
55volumes:
56  zookeeper_data:
57    driver: local
58    name: zookeeper_data
59  kafka_0_data:
60    driver: local
61    name: kafka_0_data

In order to run a PyFlink application in a Flink cluster, we need to install Python and the apache-flink package additionally. We can create a custom Docker image based on the official Flink image. I chose the version 1.15.2 as it is the recommended Flink version by KDA. The custom image can be built by docker build -t pyflink:1.15.2-scala_2.12 ..

 1FROM flink:1.15.2-scala_2.12
 2
 3ARG PYTHON_VERSION
 4ENV PYTHON_VERSION=${PYTHON_VERSION:-3.8.10}
 5ARG FLINK_VERSION
 6ENV FLINK_VERSION=${FLINK_VERSION:-1.15.2}
 7
 8# Currently only Python 3.6, 3.7 and 3.8 are supported officially.
 9RUN apt-get update -y && \
10  apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
11  wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz && \
12  tar -xvf Python-${PYTHON_VERSION}.tgz && \
13  cd Python-${PYTHON_VERSION} && \
14  ./configure --without-tests --enable-shared && \
15  make -j6 && \
16  make install && \
17  ldconfig /usr/local/lib && \
18  cd .. && rm -f Python-${PYTHON_VERSION}.tgz && rm -rf Python-${PYTHON_VERSION} && \
19  ln -s /usr/local/bin/python3 /usr/local/bin/python && \
20  apt-get clean && \
21  rm -rf /var/lib/apt/lists/*
22
23# install PyFlink
24RUN pip3 install apache-flink==${FLINK_VERSION}

A Flink cluster is made up of a single Job Manger and Task Manager, and the cluster runs in the Session Mode where one or more Flink applications can be submitted/executed simultaneously. See this page for details about how to create a Flink cluster using docker-compose.

Two environment variables are configured to adjust the application behaviour. The RUNTIME_ENV is set to DOCKER, and it determines which pipeline jar and application property file to choose. Also, the BOOTSTRAP_SERVERS overrides the Kafka bootstrap server address value from the application property file. We will make use of it to configure the bootstrap server address dynamically for MSK in part 2. Finally, the current directory is volume-mapped into /etc/flink so that the application and related resources can be available in the Flink cluster.

The Flink cluster can be started by docker-compose -f compose-flink.yml up -d.

 1version: "3.5"
 2
 3services:
 4  jobmanager:
 5    image: pyflink:1.15.2-scala_2.12
 6    container_name: jobmanager
 7    command: jobmanager
 8    ports:
 9      - "8081:8081"
10    networks:
11      - kafkanet
12    environment:
13      - |
14        FLINK_PROPERTIES=
15        jobmanager.rpc.address: jobmanager
16        state.backend: filesystem
17        state.checkpoints.dir: file:///tmp/flink-checkpoints
18        heartbeat.interval: 1000
19        heartbeat.timeout: 5000
20        rest.flamegraph.enabled: true
21        web.backpressure.refresh-interval: 10000        
22      - RUNTIME_ENV=DOCKER
23      - BOOTSTRAP_SERVERS=kafka-0:9092
24    volumes:
25      - $PWD:/etc/flink
26  taskmanager:
27    image: pyflink:1.15.2-scala_2.12
28    container_name: taskmanager
29    command: taskmanager
30    networks:
31      - kafkanet
32    volumes:
33      - flink_data:/tmp/
34      - $PWD:/etc/flink
35    environment:
36      - |
37        FLINK_PROPERTIES=
38        jobmanager.rpc.address: jobmanager
39        taskmanager.numberOfTaskSlots: 3
40        state.backend: filesystem
41        state.checkpoints.dir: file:///tmp/flink-checkpoints
42        heartbeat.interval: 1000
43        heartbeat.timeout: 5000        
44      - RUNTIME_ENV=DOCKER
45      - BOOTSTRAP_SERVERS=kafka-0:9092
46    depends_on:
47      - jobmanager
48
49networks:
50  kafkanet:
51    external: true
52    name: kafka-network
53
54volumes:
55  flink_data:
56    driver: local
57    name: flink_data

Virtual Environment

As mentioned earlier, all Python apps run in a virtual environment, and we have the following pip packages. We use the version 1.15.2 of the apache-flink package because it is the recommended version by KDA. We also need the kafka-python package for source data generation. The pip packages can be installed by pip install -r requirements-dev.txt.

1# requirements.txt
2kafka-python==2.0.2
3
4# requirements-dev.txt
5-r requirements.txt
6apache-flink==1.15.2
7black==19.10b0
8pytest
9pytest-cov

Application

Source Data

A single Python script is created to generate fake stock price records. The class for the stock record has the asdict, auto and create methods. The create method generates a list of records where each element is instantiated by the auto method. Those records are sent into the relevant Kafka topic after being converted into a dictionary by the asdict method.

A Kafka producer is created as an attribute of the Producer class. The source records are sent into the relevant topic by the send method. Note that both the key and value of the messages are serialized as json.

The data generator can be started simply by python producer.py.

 1# producer.py
 2import os
 3import datetime
 4import time
 5import json
 6import typing
 7import random
 8import logging
 9import dataclasses
10
11from kafka import KafkaProducer
12
13logging.basicConfig(
14    level=logging.INFO,
15    format="%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:%(message)s",
16    datefmt="%Y-%m-%d %H:%M:%S",
17)
18
19datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
20
21
22@dataclasses.dataclass
23class Stock:
24    event_time: str
25    ticker: str
26    price: float
27
28    def asdict(self):
29        return dataclasses.asdict(self)
30
31    @classmethod
32    def auto(cls, ticker: str):
33        # event_time = datetime.datetime.now().isoformat(timespec="milliseconds")
34        event_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
35        price = round(random.random() * 100, 2)
36        return cls(event_time, ticker, price)
37
38    @staticmethod
39    def create():
40        tickers = '["AAPL", "ACN", "ADBE", "AMD", "AVGO", "CRM", "CSCO", "IBM", "INTC", "MA", "MSFT", "NVDA", "ORCL", "PYPL", "QCOM", "TXN", "V"]'
41        return [Stock.auto(ticker) for ticker in json.loads(tickers)]
42
43
44class Producer:
45    def __init__(self, bootstrap_servers: list, topic: str):
46        self.bootstrap_servers = bootstrap_servers
47        self.topic = topic
48        self.producer = self.create()
49
50    def create(self):
51        return KafkaProducer(
52            bootstrap_servers=self.bootstrap_servers,
53            key_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
54            value_serializer=lambda v: json.dumps(v, default=self.serialize).encode("utf-8"),
55        )
56
57    def send(self, stocks: typing.List[Stock]):
58        for stock in stocks:
59            try:
60                self.producer.send(self.topic, key={"ticker": stock.ticker}, value=stock.asdict())
61            except Exception as e:
62                raise RuntimeError("fails to send a message") from e
63        self.producer.flush()
64
65    def serialize(self, obj):
66        if isinstance(obj, datetime.datetime):
67            return obj.isoformat()
68        if isinstance(obj, datetime.date):
69            return str(obj)
70        return obj
71
72
73if __name__ == "__main__":
74    producer = Producer(
75        bootstrap_servers=os.getenv("BOOTSTRAP_SERVERS", "localhost:29092").split(","),
76        topic=os.getenv("TOPIC_NAME", "stocks-in"),
77    )
78    max_run = int(os.getenv("MAX_RUN", "-1"))
79    logging.info(f"max run - {max_run}")
80    current_run = 0
81    while True:
82        current_run += 1
83        logging.info(f"current run - {current_run}")
84        if current_run - max_run == 0:
85            logging.info(f"reached max run, finish")
86            producer.producer.close()
87            break
88        producer.send(Stock.create())
89        secs = random.randint(5, 10)
90        logging.info(f"messages sent... wait {secs} seconds")
91        time.sleep(secs)

Once we start the app, we can check the topic for the source data is created and messages are ingested in Kpow.

Process Data

The Flink application is built using the Table API. We have two Kafka topics - one for the source and the other for the sink. Simply put, we can manipulate the records of the topics as tables of unbounded real-time streams with the Table API. In order to read/write records from/to a Kafka topic, we need to specify the Apache Kafka SQL Connector artifact that we downloaded earlier as the pipeline jar. Note we only need to configure the connector jar when we develop the app locally as the jar file will be specified by the --jarfile option when submitting it to a Flink cluster or deploying via KDA. We also need the application properties file (application_properties.json) in order to be comparable with KDA. The file contains the Flink runtime options in KDA as well as application specific properties. All the properties should be specified when deploying via KDA and, for local development, we keep them as a json file and only the application specific properties are used.

The tables for the source and output topics can be created using SQL with options that are related to the Kafka connector. Key options cover the connector name (connector), topic name (topic), bootstrap server address (properties.bootstrap.servers) and format (format). See the connector document for more details about the connector configuration. When it comes to inserting the source records into the output table, we can use either SQL or built-in add_insert method.

In the main method, we create all the source and sink tables after mapping relevant application properties. Then the output records are inserted into the output Kafka topic. Note that the output records are printed in the terminal additionally when the app is running locally for ease of checking them.

  1# processor.py
  2import os
  3import json
  4import re
  5import logging
  6
  7import kafka  # check if --pyFiles works
  8from pyflink.table import EnvironmentSettings, TableEnvironment
  9
 10logging.basicConfig(
 11    level=logging.INFO,
 12    format="%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:%(message)s",
 13    datefmt="%Y-%m-%d %H:%M:%S",
 14)
 15
 16RUNTIME_ENV = os.environ.get("RUNTIME_ENV", "KDA")  # KDA, DOCKER, LOCAL
 17BOOTSTRAP_SERVERS = os.environ.get("BOOTSTRAP_SERVERS")  # overwrite app config
 18
 19logging.info(f"runtime environment - {RUNTIME_ENV}...")
 20
 21env_settings = EnvironmentSettings.in_streaming_mode()
 22table_env = TableEnvironment.create(env_settings)
 23
 24APPLICATION_PROPERTIES_FILE_PATH = (
 25    "/etc/flink/application_properties.json"  # on kda or docker-compose
 26    if RUNTIME_ENV != "LOCAL"
 27    else "application_properties.json"
 28)
 29
 30if RUNTIME_ENV != "KDA":
 31    # on non-KDA, multiple jar files can be passed after being delimited by a semicolon
 32    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
 33    PIPELINE_JAR = "flink-sql-connector-kafka-1.15.2.jar"
 34    table_env.get_config().set(
 35        "pipeline.jars", f"file://{os.path.join(CURRENT_DIR, 'package', 'lib', PIPELINE_JAR)}"
 36    )
 37logging.info(f"app properties file path - {APPLICATION_PROPERTIES_FILE_PATH}")
 38
 39
 40def get_application_properties():
 41    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
 42        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
 43            contents = file.read()
 44            properties = json.loads(contents)
 45            return properties
 46    else:
 47        raise RuntimeError(f"A file at '{APPLICATION_PROPERTIES_FILE_PATH}' was not found")
 48
 49
 50def property_map(props: dict, property_group_id: str):
 51    for prop in props:
 52        if prop["PropertyGroupId"] == property_group_id:
 53            return prop["PropertyMap"]
 54
 55
 56def create_source_table(
 57    table_name: str, topic_name: str, bootstrap_servers: str, startup_mode: str
 58):
 59    stmt = f"""
 60    CREATE TABLE {table_name} (
 61        event_time TIMESTAMP(3),
 62        ticker VARCHAR(6),
 63        price DOUBLE
 64    )
 65    WITH (
 66        'connector' = 'kafka',
 67        'topic' = '{topic_name}',
 68        'properties.bootstrap.servers' = '{bootstrap_servers}',
 69        'properties.group.id' = 'source-group',
 70        'format' = 'json',
 71        'scan.startup.mode' = '{startup_mode}'
 72    )
 73    """
 74    logging.info("source table statement...")
 75    logging.info(stmt)
 76    return stmt
 77
 78
 79def create_sink_table(table_name: str, topic_name: str, bootstrap_servers: str):
 80    stmt = f"""
 81    CREATE TABLE {table_name} (
 82        event_time TIMESTAMP(3),
 83        ticker VARCHAR(6),
 84        price DOUBLE
 85    )
 86    WITH (
 87        'connector' = 'kafka',
 88        'topic' = '{topic_name}',
 89        'properties.bootstrap.servers' = '{bootstrap_servers}',        
 90        'format' = 'json',
 91        'key.format' = 'json',
 92        'key.fields' = 'ticker',
 93        'properties.allow.auto.create.topics' = 'true'
 94    )
 95    """
 96    logging.info("sint table statement...")
 97    logging.info(stmt)
 98    return stmt
 99
100
101def create_print_table(table_name: str):
102    return f"""
103    CREATE TABLE {table_name} (
104        event_time TIMESTAMP(3),
105        ticker VARCHAR(6),
106        price DOUBLE
107    )
108    WITH (
109        'connector' = 'print'
110    )
111    """
112
113
114def main():
115    ## map consumer/producer properties
116    props = get_application_properties()
117    # consumer
118    consumer_property_group_key = "consumer.config.0"
119    consumer_properties = property_map(props, consumer_property_group_key)
120    consumer_table_name = consumer_properties["table.name"]
121    consumer_topic_name = consumer_properties["topic.name"]
122    consumer_bootstrap_servers = BOOTSTRAP_SERVERS or consumer_properties["bootstrap.servers"]
123    consumer_startup_mode = consumer_properties["startup.mode"]
124    # producer
125    producer_property_group_key = "producer.config.0"
126    producer_properties = property_map(props, producer_property_group_key)
127    producer_table_name = producer_properties["table.name"]
128    producer_topic_name = producer_properties["topic.name"]
129    producer_bootstrap_servers = BOOTSTRAP_SERVERS or producer_properties["bootstrap.servers"]
130    # print
131    print_table_name = "sink_print"
132    ## create a souce table
133    table_env.execute_sql(
134        create_source_table(
135            consumer_table_name,
136            consumer_topic_name,
137            consumer_bootstrap_servers,
138            consumer_startup_mode,
139        )
140    )
141    ## create sink tables
142    table_env.execute_sql(
143        create_sink_table(producer_table_name, producer_topic_name, producer_bootstrap_servers)
144    )
145    table_env.execute_sql(create_print_table("sink_print"))
146    ## insert into sink tables
147    if RUNTIME_ENV == "LOCAL":
148        source_table = table_env.from_path(consumer_table_name)
149        statement_set = table_env.create_statement_set()
150        statement_set.add_insert(producer_table_name, source_table)
151        statement_set.add_insert(print_table_name, source_table)
152        statement_set.execute().wait()
153    else:
154        table_result = table_env.execute_sql(
155            f"INSERT INTO {producer_table_name} SELECT * FROM {consumer_table_name}"
156        )
157        logging.info(table_result.get_job_client().get_job_status())
158
159
160if __name__ == "__main__":
161    main()
 1// application_properties.json
 2[
 3  {
 4    "PropertyGroupId": "kinesis.analytics.flink.run.options",
 5    "PropertyMap": {
 6      "python": "processor.py",
 7      "jarfile": "package/lib/flink-sql-connector-kinesis-1.15.2.jar",
 8      "pyFiles": "package/site_packages/"
 9    }
10  },
11  {
12    "PropertyGroupId": "consumer.config.0",
13    "PropertyMap": {
14      "table.name": "source_table",
15      "topic.name": "stocks-in",
16      "bootstrap.servers": "localhost:29092",
17      "startup.mode": "earliest-offset"
18    }
19  },
20  {
21    "PropertyGroupId": "producer.config.0",
22    "PropertyMap": {
23      "table.name": "sink_table",
24      "topic.name": "stocks-out",
25      "bootstrap.servers": "localhost:29092"
26    }
27  }
28]

Run Locally

We can run the app locally as following - RUNTIME_ENV=LOCAL python processor.py. The terminal on the right-hand side shows the output records of the Flink app while the left-hand side records logs of the producer app. We can see that the print output from the Flink app gets updated when new source records are sent into the source topic by the producer app.

We can also see details of all the topics in Kpow as shown below. The total number of messages matches between the source and output topics but not within partitions.

The execution in a terminal is limited for monitoring, and we can inspect and understand what is happening inside Flink using the Flink Web UI. For this, we need to submit the app to the Flink cluster we created earlier. Typically, a Pyflink app can be submitted using the CLI interface by specifying the main application (–python), Kafka connector artifact file (–jarfile), and 3rd-party Python packages (–pyFiles) if necessary. Once submitted, it shows the status with the job ID.

 1$ docker exec jobmanager /opt/flink/bin/flink run \
 2  --python /etc/flink/processor.py \
 3  --jarfile /etc/flink/package/lib/flink-sql-connector-kafka-1.15.2.jar \
 4  --pyFiles /etc/flink/package/site_packages/ \
 5  -d
 62023-08-08 02:07:13.220:INFO:root:runtime environment - DOCKER...
 72023-08-08 02:07:14.341:INFO:root:app properties file path - /etc/flink/application_properties.json
 82023-08-08 02:07:14.341:INFO:root:source table statement...
 92023-08-08 02:07:14.341:INFO:root:
10    CREATE TABLE source_table (
11        event_time TIMESTAMP(3),
12        ticker VARCHAR(6),
13        price DOUBLE
14    )
15    WITH (
16        'connector' = 'kafka',
17        'topic' = 'stocks-in',
18        'properties.bootstrap.servers' = 'kafka-0:9092',
19        'properties.group.id' = 'source-group',
20        'format' = 'json',
21        'scan.startup.mode' = 'earliest-offset'
22    )
23    
242023-08-08 02:07:14.439:INFO:root:sint table statement...
252023-08-08 02:07:14.439:INFO:root:
26    CREATE TABLE sink_table (
27        event_time TIMESTAMP(3),
28        ticker VARCHAR(6),
29        price DOUBLE
30    )
31    WITH (
32        'connector' = 'kafka',
33        'topic' = 'stocks-out',
34        'properties.bootstrap.servers' = 'kafka-0:9092',        
35        'format' = 'json',
36        'key.format' = 'json',
37        'key.fields' = 'ticker',
38        'properties.allow.auto.create.topics' = 'true'
39    )
40    
41WARNING: An illegal reflective access operation has occurred
42WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist-1.15.4.jar) to field java.lang.String.value
43WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
44WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
45WARNING: All illegal access operations will be denied in a future release
46Job has been submitted with JobID 02d83c46d646aa498a986c0a9335e276
472023-08-08 02:07:23.010:INFO:root:java.util.concurrent.CompletableFuture@34e729a3[Not completed]

We can check the submitted job by listing all jobs as shown below.

1$ docker exec jobmanager /opt/flink/bin/flink list
2Waiting for response...
3------------------ Running/Restarting Jobs -------------------
408.08.2023 02:07:18 : 02d83c46d646aa498a986c0a9335e276 : insert-into_default_catalog.default_database.sink_table (RUNNING)
5--------------------------------------------------------------
6No scheduled jobs.

The Flink Web UI can be accessed on port 8081. In the Overview section, it shows the available task slots, running jobs and completed jobs.

We can inspect an individual job in the Jobs menu. It shows key details about a job execution in Overview, Exceptions, TimeLine, Checkpoints and Configuration tabs.

We can cancel a job on the web UI or using the CLI. Below shows how to cancel the job we submitted earlier using the CLI.

1$ docker exec jobmanager /opt/flink/bin/flink cancel 02d83c46d646aa498a986c0a9335e276
2Cancelling job 02d83c46d646aa498a986c0a9335e276.
3Cancelled job 02d83c46d646aa498a986c0a9335e276.

Summary

Apache Flink is widely used for building real-time stream processing applications. On AWS, Kinesis Data Analytics (KDA) is the easiest option to develop a Flink app as it provides the underlying infrastructure. Updating a guide from AWS, this series of posts discuss how to develop and deploy a Flink (Pyflink) application via KDA where the data source and sink are Kafka topics. In part 1, the app was developed locally targeting a Kafka cluster created by Docker. Furthermore, it was executed in a virtual environment as well as in a local Flink cluster for improved monitoring.