The value of data can be maximised when it is used without delay. With Apache Flink, we can build streaming analytics applications that incorporate the latest events with low latency. In this lab, we will create a Pyflink application that writes accumulated taxi rides data into an OpenSearch cluster. It aggregates the number of trips/passengers and trip durations by vendor ID for a window of 5 seconds. The data is then used to create a chart that monitors the status of taxi rides in the OpenSearch Dashboard.

Architecture

Fake taxi ride data is sent to a Kafka topic by the Kafka producer application that is discussed in Lab 1. The Pyflink app aggregates the number of trips/passengers and trip durations by vendor ID for a window of 5 seconds and sends the accumulated records into an OpenSearch cluster. The data is then used to create a chart that monitors the status of taxi rides in the OpenSearch Dashboard.

Infrastructure

The AWS infrastructure is created using Terraform and the source can be found in the GitHub repository of this post. See this earlier post for details about how to create the resources. The key resources cover a VPC, VPN server, MSK cluster and Python Lambda producer app.

OpenSearch Cluster

For this lab, an OpenSearch cluster is created additionally, and it is deployed with the m5.large.search instance type in private subnets. For simplicity, anonymous authentication is enabled so that we don’t have to specify user credentials when making an HTTP request. Overall only network-level security is enforced on the OpenSearch domain. Note that the cluster is created only when the opensearch_to_create variable is set to true.

 1# infra/variables.tf
 2variable "opensearch_to_create" {
 3  description = "Flag to indicate whether to create OpenSearch cluster"
 4  type        = bool
 5  default     = false
 6}
 7
 8...
 9
10locals {
11
12  ...
13
14  opensearch = {
15    to_create      = var.opensearch_to_create
16    engine_version = "2.7"
17    instance_type  = "m5.large.search"
18    instance_count = 2
19  }
20
21  ...
22
23}
24
25# infra/opensearch.tf
26resource "aws_opensearch_domain" "opensearch" {
27  count = local.opensearch.to_create ? 1 : 0
28
29  domain_name    = local.name
30  engine_version = "OpenSearch_${local.opensearch.engine_version}"
31
32  cluster_config {
33    dedicated_master_enabled = false
34    instance_type            = local.opensearch.instance_type  # m5.large.search
35    instance_count           = local.opensearch.instance_count # 2
36    zone_awareness_enabled   = true
37  }
38
39  advanced_security_options {
40    enabled                        = false
41    anonymous_auth_enabled         = true
42    internal_user_database_enabled = true
43  }
44
45  domain_endpoint_options {
46    enforce_https           = true
47    tls_security_policy     = "Policy-Min-TLS-1-2-2019-07"
48    custom_endpoint_enabled = false
49  }
50
51  ebs_options {
52    ebs_enabled = true
53    volume_size = 10
54  }
55
56  log_publishing_options {
57    cloudwatch_log_group_arn = aws_cloudwatch_log_group.opensearch_log_group_index_slow_logs[0].arn
58    log_type                 = "INDEX_SLOW_LOGS"
59  }
60
61  vpc_options {
62    subnet_ids         = slice(module.vpc.private_subnets, 0, local.opensearch.instance_count)
63    security_group_ids = [aws_security_group.opensearch[0].id]
64  }
65
66  access_policies = jsonencode({
67    Version = "2012-10-17"
68    Statement = [
69      {
70        Action    = "es:*",
71        Principal = "*",
72        Effect    = "Allow",
73        Resource  = "arn:aws:es:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:domain/${local.name}/*"
74      }
75    ]
76  })
77}

OpenSearch Security Group

The security group of the OpenSearch domain has inbound rules that allow connection from the security groups of the VPN server. It is important to configure those rules because the Pyflink application will be executed in the developer machine and access to the OpenSearch cluster will be made through the VPN server. Only port 443 and 9200 are open for accessing the OpenSearch Dashboard and making HTTP requests.

 1# infra/opensearch.tf
 2resource "aws_security_group" "opensearch" {
 3  count = local.opensearch.to_create ? 1 : 0
 4
 5  name   = "${local.name}-opensearch-sg"
 6  vpc_id = module.vpc.vpc_id
 7
 8  lifecycle {
 9    create_before_destroy = true
10  }
11
12  tags = local.tags
13}
14
15...
16
17resource "aws_security_group_rule" "opensearch_vpn_inbound_https" {
18  count                    = local.vpn.to_create && local.opensearch.to_create ? 1 : 0
19  type                     = "ingress"
20  description              = "Allow inbound traffic for OpenSearch Dashboard from VPN"
21  security_group_id        = aws_security_group.opensearch[0].id
22  protocol                 = "tcp"
23  from_port                = 443
24  to_port                  = 443
25  source_security_group_id = aws_security_group.vpn[0].id
26}
27
28resource "aws_security_group_rule" "opensearch_vpn_inbound_rest" {
29  count                    = local.vpn.to_create && local.opensearch.to_create ? 1 : 0
30  type                     = "ingress"
31  description              = "Allow inbound traffic for OpenSearch REST API from VPN"
32  security_group_id        = aws_security_group.opensearch[0].id
33  protocol                 = "tcp"
34  from_port                = 9200
35  to_port                  = 9200
36  source_security_group_id = aws_security_group.vpn[0].id
37}

The infrastructure can be deployed (as well as destroyed) using Terraform CLI as shown below.

1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true' -var 'opensearch_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true' -var 'opensearch_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true' -var 'opensearch_to_create=true'

Once the resources are deployed, we can check the OpenSearch cluster on AWS Console as shown below.

Local OpenSearch Cluster on Docker (Optional)

As discussed further later, we can use a local Kafka cluster deployed on Docker instead of one on Amazon MSK. For this option, we need to deploy a local OpenSearch cluster on Docker and the following Docker Compose file defines a single node OpenSearch Cluster and OpenSearch Dashboard services.

 1# compose-extra.yml
 2version: "3.5"
 3
 4services:
 5  opensearch:
 6    image: opensearchproject/opensearch:2.7.0
 7    container_name: opensearch
 8    environment:
 9      - discovery.type=single-node
10      - node.name=opensearch
11      - DISABLE_SECURITY_PLUGIN=true
12      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
13    volumes:
14      - opensearch_data:/usr/share/opensearch/data
15    ports:
16      - 9200:9200
17      - 9600:9600
18    networks:
19      - appnet
20  opensearch-dashboards:
21    image: opensearchproject/opensearch-dashboards:2.7.0
22    container_name: opensearch-dashboards
23    ports:
24      - 5601:5601
25    expose:
26      - "5601"
27    environment:
28      OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
29      DISABLE_SECURITY_DASHBOARDS_PLUGIN: true
30    networks:
31      - appnet
32
33networks:
34  appnet:
35    external: true
36    name: app-network
37
38volumes:
39  opensearch_data:
40    driver: local
41    name: opensearch_data

There are two Docker Compose files that deploy a Flink Cluster locally. The first one (compose-msk.yml) relies on the Kafka cluster on Amazon MSK while a local Kafka cluster is created together with a Flink cluster in the second file (compose-local-kafka.yml) - see Lab 2 and Lab 3 respectively for details about them. Note that, if we use a local Kafka and Flink clusters, we don’t have to deploy the AWS resources. Instead, we can use a local OpenSearch cluster, and it can be deployed by using compose-extra.yml.

The Docker Compose services can be deployed as shown below.

1## flink cluster with msk cluster
2$ docker-compose -f compose-msk.yml up -d
3
4## local kafka/flink cluster
5$ docker-compose -f compose-local-kafka.yml up -d
6# opensearch cluster
7$ docker-compose -f compose-extra.yml up -d

The application has multiple dependencies and a single Jar file is created so that it can be specified in the --jarfile option. Note that we have to build the Jar file based on the Apache Kafka Connector instead of the Apache Kafka SQL Connector because the MSK IAM Auth library is not compatible with the latter due to shade relocation. The dependencies are grouped as shown below.

  • Kafka Connector
    • flink-connector-base
    • flink-connector-kafka
    • aws-msk-iam-auth (for IAM authentication)
  • OpenSearch Connector
    • flink-connector-opensearch
    • opensearch
    • opensearch-rest-high-level-client
    • org.apache.httpcomponents
    • httpcore-nio

A single Jar file is created for this lab as the app may need to be deployed via Amazon Managed Flink potentially. If we do not have to deploy on it, however, they can be added to the Flink library folder (/opt/flink/lib) separately.

The single Uber jar file is created by the following POM file and the Jar file is named as lab4-pipeline-1.0.0.jar.

  1<!-- package/lab4-pipeline/pom.xml -->
  2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4	<modelVersion>4.0.0</modelVersion>
  5
  6	<groupId>com.amazonaws.services.kinesisanalytics</groupId>
  7	<artifactId>lab4-pipeline</artifactId>
  8	<version>1.0.0</version>
  9	<packaging>jar</packaging>
 10
 11	<name>Uber Jar for Lab 4</name>
 12
 13	<properties>
 14		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 15		<flink.version>1.17.1</flink.version>
 16		<jdk.version>11</jdk.version>
 17		<kafka.clients.version>3.2.3</kafka.clients.version>
 18		<log4j.version>2.17.1</log4j.version>
 19		<aws-msk-iam-auth.version>1.1.7</aws-msk-iam-auth.version>
 20		<opensearch.connector.version>1.0.1-1.17</opensearch.connector.version>
 21		<opensearch.version>2.7.0</opensearch.version>
 22		<httpcore-nio.version>4.4.12</httpcore-nio.version>
 23	</properties>
 24
 25	<repositories>
 26		<repository>
 27			<id>apache.snapshots</id>
 28			<name>Apache Development Snapshot Repository</name>
 29			<url>https://repository.apache.org/content/repositories/snapshots/</url>
 30			<releases>
 31				<enabled>false</enabled>
 32			</releases>
 33			<snapshots>
 34				<enabled>true</enabled>
 35			</snapshots>
 36		</repository>
 37	</repositories>
 38
 39	<dependencies>
 40
 41		<!-- Kafkf Connector -->
 42
 43		<dependency>
 44			<groupId>org.apache.flink</groupId>
 45			<artifactId>flink-connector-base</artifactId>
 46			<version>${flink.version}</version>
 47		</dependency>
 48
 49		<dependency>
 50			<groupId>org.apache.flink</groupId>
 51			<artifactId>flink-connector-kafka</artifactId>
 52			<version>${flink.version}</version>
 53		</dependency>
 54
 55		<dependency>
 56			<groupId>org.apache.kafka</groupId>
 57			<artifactId>kafka-clients</artifactId>
 58			<version>${kafka.clients.version}</version>
 59		</dependency>
 60
 61		<dependency>
 62			<groupId>software.amazon.msk</groupId>
 63			<artifactId>aws-msk-iam-auth</artifactId>
 64			<version>${aws-msk-iam-auth.version}</version>
 65		</dependency>
 66
 67		<!-- OpenSearch -->
 68
 69		<dependency>
 70			<groupId>org.apache.flink</groupId>
 71			<artifactId>flink-connector-opensearch</artifactId>
 72			<version>${opensearch.connector.version}</version>
 73		</dependency>
 74
 75		<dependency>
 76			<groupId>org.opensearch</groupId>
 77			<artifactId>opensearch</artifactId>
 78			<version>${opensearch.version}</version>
 79		</dependency>
 80
 81		<dependency>
 82			<groupId>org.opensearch.client</groupId>
 83			<artifactId>opensearch-rest-high-level-client</artifactId>
 84			<version>${opensearch.version}</version>
 85			<exclusions>
 86				<exclusion>
 87					<groupId>org.apache.httpcomponents</groupId>
 88					<artifactId>httpcore-nio</artifactId>
 89				</exclusion>
 90			</exclusions>
 91		</dependency>
 92
 93		<!-- We need to include httpcore-nio again in the correct version due to the exclusion above -->
 94		<dependency>
 95			<groupId>org.apache.httpcomponents</groupId>
 96			<artifactId>httpcore-nio</artifactId>
 97			<version>${httpcore-nio.version}</version>
 98		</dependency>
 99
100		<!-- Add logging framework, to produce console output when running in the IDE. -->
101		<!-- These dependencies are excluded from the application JAR by default. -->
102		<dependency>
103			<groupId>org.apache.logging.log4j</groupId>
104			<artifactId>log4j-slf4j-impl</artifactId>
105			<version>${log4j.version}</version>
106			<scope>runtime</scope>
107		</dependency>
108		<dependency>
109			<groupId>org.apache.logging.log4j</groupId>
110			<artifactId>log4j-api</artifactId>
111			<version>${log4j.version}</version>
112			<scope>runtime</scope>
113		</dependency>
114		<dependency>
115			<groupId>org.apache.logging.log4j</groupId>
116			<artifactId>log4j-core</artifactId>
117			<version>${log4j.version}</version>
118			<scope>runtime</scope>
119		</dependency>
120	</dependencies>
121
122	<build>
123		<plugins>
124
125			<!-- Java Compiler -->
126			<plugin>
127				<groupId>org.apache.maven.plugins</groupId>
128				<artifactId>maven-compiler-plugin</artifactId>
129				<version>3.8.0</version>
130				<configuration>
131					<source>${jdk.version}</source>
132					<target>${jdk.version}</target>
133				</configuration>
134			</plugin>
135
136			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
137			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
138			<plugin>
139				<groupId>org.apache.maven.plugins</groupId>
140				<artifactId>maven-shade-plugin</artifactId>
141				<version>3.4.1</version>
142				<executions>
143					<!-- Run shade goal on package phase -->
144					<execution>
145						<phase>package</phase>
146						<goals>
147							<goal>shade</goal>
148						</goals>
149						<configuration>
150							<artifactSet>
151								<excludes>
152									<exclude>org.apache.flink:force-shading</exclude>
153									<exclude>com.google.code.findbugs:jsr305</exclude>
154									<exclude>org.slf4j:*</exclude>
155									<exclude>org.apache.logging.log4j:*</exclude>
156								</excludes>
157							</artifactSet>
158							<filters>
159								<filter>
160									<!-- Do not copy the signatures in the META-INF folder.
161									Otherwise, this might cause SecurityExceptions when using the JAR. -->
162									<artifact>*:*</artifact>
163									<excludes>
164										<exclude>META-INF/*.SF</exclude>
165										<exclude>META-INF/*.DSA</exclude>
166										<exclude>META-INF/*.RSA</exclude>
167									</excludes>
168								</filter>
169							</filters>
170							<transformers>
171								<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
172							</transformers>
173						</configuration>
174					</execution>
175				</executions>
176			</plugin>
177		</plugins>
178
179		<pluginManagement>
180			<plugins>
181
182				<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
183				<plugin>
184					<groupId>org.eclipse.m2e</groupId>
185					<artifactId>lifecycle-mapping</artifactId>
186					<version>1.0.0</version>
187					<configuration>
188						<lifecycleMappingMetadata>
189							<pluginExecutions>
190								<pluginExecution>
191									<pluginExecutionFilter>
192										<groupId>org.apache.maven.plugins</groupId>
193										<artifactId>maven-shade-plugin</artifactId>
194										<versionRange>[3.1.1,)</versionRange>
195										<goals>
196											<goal>shade</goal>
197										</goals>
198									</pluginExecutionFilter>
199									<action>
200										<ignore/>
201									</action>
202								</pluginExecution>
203								<pluginExecution>
204									<pluginExecutionFilter>
205										<groupId>org.apache.maven.plugins</groupId>
206										<artifactId>maven-compiler-plugin</artifactId>
207										<versionRange>[3.1,)</versionRange>
208										<goals>
209											<goal>testCompile</goal>
210											<goal>compile</goal>
211										</goals>
212									</pluginExecutionFilter>
213									<action>
214										<ignore/>
215									</action>
216								</pluginExecution>
217							</pluginExecutions>
218						</lifecycleMappingMetadata>
219					</configuration>
220				</plugin>
221			</plugins>
222		</pluginManagement>
223	</build>
224</project>

The Uber Jar file can be built using the following script (build.sh).

 1# build.sh
 2#!/usr/bin/env bash
 3SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
 4SRC_PATH=$SCRIPT_DIR/package
 5
 6# remove contents under $SRC_PATH (except for the folders beginging with lab)
 7shopt -s extglob
 8rm -rf $SRC_PATH/!(lab*)
 9
10## Generate Uber jar file for individual labs
11echo "generate Uber jar for PyFlink app..."
12mkdir $SRC_PATH/lib
13
14...
15
16mvn clean install -f $SRC_PATH/lab4-pipeline/pom.xml \
17  && mv $SRC_PATH/lab4-pipeline/target/lab4-pipeline-1.0.0.jar $SRC_PATH/lib \
18  && rm -rf $SRC_PATH/lab4-pipeline/target

Application Source

A source table is created to read messages from the taxi-rides topic using the Kafka Connector. The source records are bucketed in a window of 5 seconds using the TUMBLE windowing table-valued function, and those within buckets are aggregated by vendor ID, window start and window end variables. Then they are inserted into the sink table, which leads to ingesting them into an OpenSearch index named trip_stats using the OpenSearch connector.

In the main method, we create all the source and sink tables after mapping relevant application properties. Then the output records are written to the OpenSearch index. Note that the output records are printed in the terminal additionally when the app is running locally for checking them easily.

  1# forwarder/processor.py
  2import os
  3import re
  4import json
  5
  6from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
  7from pyflink.table import StreamTableEnvironment
  8
  9RUNTIME_ENV = os.environ.get("RUNTIME_ENV", "LOCAL")  # LOCAL or DOCKER
 10BOOTSTRAP_SERVERS = os.environ.get("BOOTSTRAP_SERVERS")  # overwrite app config
 11OPENSEARCH_HOSTS = os.environ.get("OPENSEARCH_HOSTS")  # overwrite app config
 12
 13env = StreamExecutionEnvironment.get_execution_environment()
 14env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
 15env.enable_checkpointing(5000)
 16
 17if RUNTIME_ENV == "LOCAL":
 18    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
 19    PARENT_DIR = os.path.dirname(CURRENT_DIR)
 20    APPLICATION_PROPERTIES_FILE_PATH = os.path.join(
 21        CURRENT_DIR, "application_properties.json"
 22    )
 23    JAR_FILES = ["lab4-pipeline-1.0.0.jar"]
 24    JAR_PATHS = tuple(
 25        [f"file://{os.path.join(PARENT_DIR, 'jars', name)}" for name in JAR_FILES]
 26    )
 27    print(JAR_PATHS)
 28    env.add_jars(*JAR_PATHS)
 29else:
 30    APPLICATION_PROPERTIES_FILE_PATH = (
 31        "/etc/flink/forwarder/application_properties.json"
 32    )
 33
 34table_env = StreamTableEnvironment.create(stream_execution_environment=env)
 35
 36
 37def get_application_properties():
 38    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
 39        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
 40            contents = file.read()
 41            properties = json.loads(contents)
 42            return properties
 43    else:
 44        raise RuntimeError(
 45            f"A file at '{APPLICATION_PROPERTIES_FILE_PATH}' was not found"
 46        )
 47
 48
 49def property_map(props: dict, property_group_id: str):
 50    for prop in props:
 51        if prop["PropertyGroupId"] == property_group_id:
 52            return prop["PropertyMap"]
 53
 54
 55def inject_security_opts(opts: dict, bootstrap_servers: str):
 56    if re.search("9098$", bootstrap_servers):
 57        opts = {
 58            **opts,
 59            **{
 60                "properties.security.protocol": "SASL_SSL",
 61                "properties.sasl.mechanism": "AWS_MSK_IAM",
 62                "properties.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
 63                "properties.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
 64            },
 65        }
 66    return ", ".join({f"'{k}' = '{v}'" for k, v in opts.items()})
 67
 68
 69def create_source_table(table_name: str, topic_name: str, bootstrap_servers: str):
 70    opts = {
 71        "connector": "kafka",
 72        "topic": topic_name,
 73        "properties.bootstrap.servers": bootstrap_servers,
 74        "properties.group.id": "soruce-group",
 75        "format": "json",
 76        "scan.startup.mode": "latest-offset",
 77    }
 78
 79    stmt = f"""
 80    CREATE TABLE {table_name} (
 81        id                  VARCHAR,
 82        vendor_id           INT,
 83        pickup_date         VARCHAR,
 84        dropoff_date        VARCHAR,
 85        passenger_count     INT,
 86        pickup_longitude    VARCHAR,
 87        pickup_latitude     VARCHAR,
 88        dropoff_longitude   VARCHAR,
 89        dropoff_latitude    VARCHAR,
 90        store_and_fwd_flag  VARCHAR,
 91        gc_distance         INT,
 92        trip_duration       INT,
 93        google_distance     INT,
 94        google_duration     INT,
 95        process_time        AS PROCTIME()
 96    ) WITH (
 97        {inject_security_opts(opts, bootstrap_servers)}
 98    )
 99    """
100    print(stmt)
101    return stmt
102
103
104def create_sink_table(table_name: str, os_hosts: str, os_index: str):
105    stmt = f"""
106    CREATE TABLE {table_name} (
107        vendor_id           VARCHAR,
108        trip_count          BIGINT NOT NULL,
109        passenger_count     INT,
110        trip_duration       INT,
111        window_start        TIMESTAMP(3) NOT NULL,
112        window_end          TIMESTAMP(3) NOT NULL
113    ) WITH (
114        'connector'= 'opensearch',
115        'hosts' = '{os_hosts}',
116        'index' = '{os_index}'
117    )
118    """
119    print(stmt)
120    return stmt
121
122
123def create_print_table(table_name: str):
124    stmt = f"""
125    CREATE TABLE {table_name} (
126        vendor_id           VARCHAR,
127        trip_count          BIGINT NOT NULL,
128        passenger_count     INT,
129        trip_duration       INT,
130        window_start        TIMESTAMP(3) NOT NULL,
131        window_end          TIMESTAMP(3) NOT NULL
132    ) WITH (
133        'connector'= 'print'
134    )
135    """
136    print(stmt)
137    return stmt
138
139
140def set_insert_sql(source_table_name: str, sink_table_name: str):
141    stmt = f"""
142    INSERT INTO {sink_table_name}
143    SELECT 
144        CAST(vendor_id AS STRING) AS vendor_id,
145        COUNT(id) AS trip_count,
146        SUM(passenger_count) AS passenger_count,
147        SUM(trip_duration) AS trip_duration,
148        window_start,
149        window_end
150    FROM TABLE(
151    TUMBLE(TABLE {source_table_name}, DESCRIPTOR(process_time), INTERVAL '5' SECONDS))
152    GROUP BY vendor_id, window_start, window_end
153    """
154    print(stmt)
155    return stmt
156
157
158def main():
159    #### map source/sink properties
160    props = get_application_properties()
161    ## source
162    source_property_group_key = "source.config.0"
163    source_properties = property_map(props, source_property_group_key)
164    print(">> source properties")
165    print(source_properties)
166    source_table_name = source_properties["table.name"]
167    source_topic_name = source_properties["topic.name"]
168    source_bootstrap_servers = (
169        BOOTSTRAP_SERVERS or source_properties["bootstrap.servers"]
170    )
171    ## sink
172    sink_property_group_key = "sink.config.0"
173    sink_properties = property_map(props, sink_property_group_key)
174    print(">> sink properties")
175    print(sink_properties)
176    sink_table_name = sink_properties["table.name"]
177    sink_os_hosts = OPENSEARCH_HOSTS or sink_properties["os_hosts"]
178    sink_os_index = sink_properties["os_index"]
179    ## print
180    print_table_name = "sink_print"
181    #### create tables
182    table_env.execute_sql(
183        create_source_table(
184            source_table_name, source_topic_name, source_bootstrap_servers
185        )
186    )
187    table_env.execute_sql(
188        create_sink_table(sink_table_name, sink_os_hosts, sink_os_index)
189    )
190    table_env.execute_sql(create_print_table(print_table_name))
191    #### insert into sink tables
192    if RUNTIME_ENV == "LOCAL":
193        statement_set = table_env.create_statement_set()
194        statement_set.add_insert_sql(set_insert_sql(source_table_name, sink_table_name))
195        statement_set.add_insert_sql(set_insert_sql(print_table_name, sink_table_name))
196        statement_set.execute().wait()
197    else:
198        table_result = table_env.execute_sql(
199            set_insert_sql(source_table_name, sink_table_name)
200        )
201        print(table_result.get_job_client().get_job_status())
202
203
204if __name__ == "__main__":
205    main()
 1// forwarder/application_properties.json
 2[
 3  {
 4    "PropertyGroupId": "kinesis.analytics.flink.run.options",
 5    "PropertyMap": {
 6      "python": "processor.py",
 7      "jarfile": "package/lib/lab4-pipeline-1.0.0.jar"
 8    }
 9  },
10  {
11    "PropertyGroupId": "source.config.0",
12    "PropertyMap": {
13      "table.name": "taxi_rides_src",
14      "topic.name": "taxi-rides",
15      "bootstrap.servers": "localhost:29092"
16    }
17  },
18  {
19    "PropertyGroupId": "sink.config.0",
20    "PropertyMap": {
21      "table.name": "trip_stats_sink",
22      "os_hosts": "http://opensearch:9200",
23      "os_index": "trip_stats"
24    }
25  }
26]

Run Application

We can run the application in the Flink cluster on Docker and the steps are shown below. Either the Kafka cluster on Amazon MSK or a local Kafka cluster can be used depending on which Docker Compose file we use. In either way, we can check the job details on the Flink web UI on localhost:8081. Note that, if we use the local Kafka cluster option, we have to start the producer application in a different terminal as well as to deploy a local OpenSearch cluster.

 1## set aws credentials environment variables
 2export AWS_ACCESS_KEY_ID=<aws-access-key-id>
 3export AWS_SECRET_ACCESS_KEY=<aws-secret-access-key>
 4
 5# set addtional environment variables when using AWS services if using compose-msk.yml
 6#   values can be obtained in Terraform outputs or AWS Console
 7export BOOTSTRAP_SERVERS=<bootstrap-servers>
 8export OPENSEARCH_HOSTS=<opensearch-hosts>
 9
10## run docker compose service
11# or with msk cluster
12docker-compose -f compose-msk.yml up -d
13
14# # with local kafka and opensearch cluster
15# docker-compose -f compose-local-kafka.yml up -d
16# docker-compose -f compose-extra.yml up -d
17
18## run the producer application in another terminal if using a local cluster
19# python producer/app.py 
20
21## submit pyflink application
22docker exec jobmanager /opt/flink/bin/flink run \
23    --python /etc/flink/forwarder/processor.py \
24    --jarfile /etc/flink/package/lib/lab4-pipeline-1.0.0.jar \
25    -d

Once the Pyflink application is submitted, we can check the details of it on the Flink UI as shown below.

Application Result

Kafka Topic

We can see the topic (taxi-rides) is created, and the details of the topic can be found on the Topics menu on localhost:3000.

OpenSearch Index

The ingested data can be checked easily using the Query Workbench as shown below.

To monitor the status of taxi rides, a horizontal bar chart is created in the OpenSearch Dashboard. The average trip duration is selected as the metric, and the records are grouped by vendor ID. We can see the values change while new records arrives.

Summary

In this lab, we created a Pyflink application that writes accumulated taxi rides data into an OpenSearch cluster. It aggregated the number of trips/passengers and trip durations by vendor ID for a window of 5 seconds. The data was then used to create a chart that monitors the status of taxi rides in the OpenSearch Dashboard.