The value of data can be maximised when it is used without delay. With Apache Flink, we can build streaming analytics applications that incorporate the latest events with low latency. In this lab, we will create a Pyflink application that writes accumulated taxi rides data into an OpenSearch cluster. It aggregates the number of trips/passengers and trip durations by vendor ID for a window of 5 seconds. The data is then used to create a chart that monitors the status of taxi rides in the OpenSearch Dashboard.
- Introduction
- Lab 1 Produce data to Kafka using Lambda
- Lab 2 Write data to Kafka from S3 using Flink
- Lab 3 Transform and write data to S3 from Kafka using Flink
- Lab 4 Clean, Aggregate, and Enrich Events with Flink (this post)
- Lab 5 Write data to DynamoDB using Kafka Connect
- Lab 6 Consume data from Kafka using Lambda
Architecture
Fake taxi ride data is sent to a Kafka topic by the Kafka producer application that is discussed in Lab 1. The Pyflink app aggregates the number of trips/passengers and trip durations by vendor ID for a window of 5 seconds and sends the accumulated records into an OpenSearch cluster. The data is then used to create a chart that monitors the status of taxi rides in the OpenSearch Dashboard.
Infrastructure
The AWS infrastructure is created using Terraform and the source can be found in the GitHub repository of this post. See this earlier post for details about how to create the resources. The key resources cover a VPC, VPN server, MSK cluster and Python Lambda producer app.
OpenSearch Cluster
For this lab, an OpenSearch cluster is created additionally, and it is deployed with the m5.large.search instance type in private subnets. For simplicity, anonymous authentication is enabled so that we don’t have to specify user credentials when making an HTTP request. Overall only network-level security is enforced on the OpenSearch domain. Note that the cluster is created only when the opensearch_to_create variable is set to true.
1# infra/variables.tf
2variable "opensearch_to_create" {
3 description = "Flag to indicate whether to create OpenSearch cluster"
4 type = bool
5 default = false
6}
7
8...
9
10locals {
11
12 ...
13
14 opensearch = {
15 to_create = var.opensearch_to_create
16 engine_version = "2.7"
17 instance_type = "m5.large.search"
18 instance_count = 2
19 }
20
21 ...
22
23}
24
25# infra/opensearch.tf
26resource "aws_opensearch_domain" "opensearch" {
27 count = local.opensearch.to_create ? 1 : 0
28
29 domain_name = local.name
30 engine_version = "OpenSearch_${local.opensearch.engine_version}"
31
32 cluster_config {
33 dedicated_master_enabled = false
34 instance_type = local.opensearch.instance_type # m5.large.search
35 instance_count = local.opensearch.instance_count # 2
36 zone_awareness_enabled = true
37 }
38
39 advanced_security_options {
40 enabled = false
41 anonymous_auth_enabled = true
42 internal_user_database_enabled = true
43 }
44
45 domain_endpoint_options {
46 enforce_https = true
47 tls_security_policy = "Policy-Min-TLS-1-2-2019-07"
48 custom_endpoint_enabled = false
49 }
50
51 ebs_options {
52 ebs_enabled = true
53 volume_size = 10
54 }
55
56 log_publishing_options {
57 cloudwatch_log_group_arn = aws_cloudwatch_log_group.opensearch_log_group_index_slow_logs[0].arn
58 log_type = "INDEX_SLOW_LOGS"
59 }
60
61 vpc_options {
62 subnet_ids = slice(module.vpc.private_subnets, 0, local.opensearch.instance_count)
63 security_group_ids = [aws_security_group.opensearch[0].id]
64 }
65
66 access_policies = jsonencode({
67 Version = "2012-10-17"
68 Statement = [
69 {
70 Action = "es:*",
71 Principal = "*",
72 Effect = "Allow",
73 Resource = "arn:aws:es:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:domain/${local.name}/*"
74 }
75 ]
76 })
77}
OpenSearch Security Group
The security group of the OpenSearch domain has inbound rules that allow connection from the security groups of the VPN server. It is important to configure those rules because the Pyflink application will be executed in the developer machine and access to the OpenSearch cluster will be made through the VPN server. Only port 443 and 9200 are open for accessing the OpenSearch Dashboard and making HTTP requests.
1# infra/opensearch.tf
2resource "aws_security_group" "opensearch" {
3 count = local.opensearch.to_create ? 1 : 0
4
5 name = "${local.name}-opensearch-sg"
6 vpc_id = module.vpc.vpc_id
7
8 lifecycle {
9 create_before_destroy = true
10 }
11
12 tags = local.tags
13}
14
15...
16
17resource "aws_security_group_rule" "opensearch_vpn_inbound_https" {
18 count = local.vpn.to_create && local.opensearch.to_create ? 1 : 0
19 type = "ingress"
20 description = "Allow inbound traffic for OpenSearch Dashboard from VPN"
21 security_group_id = aws_security_group.opensearch[0].id
22 protocol = "tcp"
23 from_port = 443
24 to_port = 443
25 source_security_group_id = aws_security_group.vpn[0].id
26}
27
28resource "aws_security_group_rule" "opensearch_vpn_inbound_rest" {
29 count = local.vpn.to_create && local.opensearch.to_create ? 1 : 0
30 type = "ingress"
31 description = "Allow inbound traffic for OpenSearch REST API from VPN"
32 security_group_id = aws_security_group.opensearch[0].id
33 protocol = "tcp"
34 from_port = 9200
35 to_port = 9200
36 source_security_group_id = aws_security_group.vpn[0].id
37}
The infrastructure can be deployed (as well as destroyed) using Terraform CLI as shown below.
1# initialize
2terraform init
3# create an execution plan
4terraform plan -var 'producer_to_create=true' -var 'opensearch_to_create=true'
5# execute the actions proposed in a Terraform plan
6terraform apply -auto-approve=true -var 'producer_to_create=true' -var 'opensearch_to_create=true'
7
8# destroy all remote objects
9# terraform destroy -auto-approve=true -var 'producer_to_create=true' -var 'opensearch_to_create=true'
Once the resources are deployed, we can check the OpenSearch cluster on AWS Console as shown below.
Local OpenSearch Cluster on Docker (Optional)
As discussed further later, we can use a local Kafka cluster deployed on Docker instead of one on Amazon MSK. For this option, we need to deploy a local OpenSearch cluster on Docker and the following Docker Compose file defines a single node OpenSearch Cluster and OpenSearch Dashboard services.
1# compose-extra.yml
2version: "3.5"
3
4services:
5 opensearch:
6 image: opensearchproject/opensearch:2.7.0
7 container_name: opensearch
8 environment:
9 - discovery.type=single-node
10 - node.name=opensearch
11 - DISABLE_SECURITY_PLUGIN=true
12 - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
13 volumes:
14 - opensearch_data:/usr/share/opensearch/data
15 ports:
16 - 9200:9200
17 - 9600:9600
18 networks:
19 - appnet
20 opensearch-dashboards:
21 image: opensearchproject/opensearch-dashboards:2.7.0
22 container_name: opensearch-dashboards
23 ports:
24 - 5601:5601
25 expose:
26 - "5601"
27 environment:
28 OPENSEARCH_HOSTS: '["http://opensearch:9200"]'
29 DISABLE_SECURITY_DASHBOARDS_PLUGIN: true
30 networks:
31 - appnet
32
33networks:
34 appnet:
35 external: true
36 name: app-network
37
38volumes:
39 opensearch_data:
40 driver: local
41 name: opensearch_data
Flink Cluster on Docker Compose
There are two Docker Compose files that deploy a Flink Cluster locally. The first one (compose-msk.yml) relies on the Kafka cluster on Amazon MSK while a local Kafka cluster is created together with a Flink cluster in the second file (compose-local-kafka.yml) - see Lab 2 and Lab 3 respectively for details about them. Note that, if we use a local Kafka and Flink clusters, we don’t have to deploy the AWS resources. Instead, we can use a local OpenSearch cluster, and it can be deployed by using compose-extra.yml.
The Docker Compose services can be deployed as shown below.
1## flink cluster with msk cluster
2$ docker-compose -f compose-msk.yml up -d
3
4## local kafka/flink cluster
5$ docker-compose -f compose-local-kafka.yml up -d
6# opensearch cluster
7$ docker-compose -f compose-extra.yml up -d
Pyflink Application
Flink Pipeline Jar
The application has multiple dependencies and a single Jar file is created so that it can be specified in the --jarfile
option. Note that we have to build the Jar file based on the Apache Kafka Connector instead of the Apache Kafka SQL Connector because the MSK IAM Auth library is not compatible with the latter due to shade relocation. The dependencies are grouped as shown below.
- Kafka Connector
- flink-connector-base
- flink-connector-kafka
- aws-msk-iam-auth (for IAM authentication)
- OpenSearch Connector
- flink-connector-opensearch
- opensearch
- opensearch-rest-high-level-client
- org.apache.httpcomponents
- httpcore-nio
A single Jar file is created for this lab as the app may need to be deployed via Amazon Managed Flink potentially. If we do not have to deploy on it, however, they can be added to the Flink library folder (/opt/flink/lib) separately.
The single Uber jar file is created by the following POM file and the Jar file is named as lab4-pipeline-1.0.0.jar.
1<!-- package/lab4-pipeline/pom.xml -->
2<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5
6 <groupId>com.amazonaws.services.kinesisanalytics</groupId>
7 <artifactId>lab4-pipeline</artifactId>
8 <version>1.0.0</version>
9 <packaging>jar</packaging>
10
11 <name>Uber Jar for Lab 4</name>
12
13 <properties>
14 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15 <flink.version>1.17.1</flink.version>
16 <jdk.version>11</jdk.version>
17 <kafka.clients.version>3.2.3</kafka.clients.version>
18 <log4j.version>2.17.1</log4j.version>
19 <aws-msk-iam-auth.version>1.1.7</aws-msk-iam-auth.version>
20 <opensearch.connector.version>1.0.1-1.17</opensearch.connector.version>
21 <opensearch.version>2.7.0</opensearch.version>
22 <httpcore-nio.version>4.4.12</httpcore-nio.version>
23 </properties>
24
25 <repositories>
26 <repository>
27 <id>apache.snapshots</id>
28 <name>Apache Development Snapshot Repository</name>
29 <url>https://repository.apache.org/content/repositories/snapshots/</url>
30 <releases>
31 <enabled>false</enabled>
32 </releases>
33 <snapshots>
34 <enabled>true</enabled>
35 </snapshots>
36 </repository>
37 </repositories>
38
39 <dependencies>
40
41 <!-- Kafkf Connector -->
42
43 <dependency>
44 <groupId>org.apache.flink</groupId>
45 <artifactId>flink-connector-base</artifactId>
46 <version>${flink.version}</version>
47 </dependency>
48
49 <dependency>
50 <groupId>org.apache.flink</groupId>
51 <artifactId>flink-connector-kafka</artifactId>
52 <version>${flink.version}</version>
53 </dependency>
54
55 <dependency>
56 <groupId>org.apache.kafka</groupId>
57 <artifactId>kafka-clients</artifactId>
58 <version>${kafka.clients.version}</version>
59 </dependency>
60
61 <dependency>
62 <groupId>software.amazon.msk</groupId>
63 <artifactId>aws-msk-iam-auth</artifactId>
64 <version>${aws-msk-iam-auth.version}</version>
65 </dependency>
66
67 <!-- OpenSearch -->
68
69 <dependency>
70 <groupId>org.apache.flink</groupId>
71 <artifactId>flink-connector-opensearch</artifactId>
72 <version>${opensearch.connector.version}</version>
73 </dependency>
74
75 <dependency>
76 <groupId>org.opensearch</groupId>
77 <artifactId>opensearch</artifactId>
78 <version>${opensearch.version}</version>
79 </dependency>
80
81 <dependency>
82 <groupId>org.opensearch.client</groupId>
83 <artifactId>opensearch-rest-high-level-client</artifactId>
84 <version>${opensearch.version}</version>
85 <exclusions>
86 <exclusion>
87 <groupId>org.apache.httpcomponents</groupId>
88 <artifactId>httpcore-nio</artifactId>
89 </exclusion>
90 </exclusions>
91 </dependency>
92
93 <!-- We need to include httpcore-nio again in the correct version due to the exclusion above -->
94 <dependency>
95 <groupId>org.apache.httpcomponents</groupId>
96 <artifactId>httpcore-nio</artifactId>
97 <version>${httpcore-nio.version}</version>
98 </dependency>
99
100 <!-- Add logging framework, to produce console output when running in the IDE. -->
101 <!-- These dependencies are excluded from the application JAR by default. -->
102 <dependency>
103 <groupId>org.apache.logging.log4j</groupId>
104 <artifactId>log4j-slf4j-impl</artifactId>
105 <version>${log4j.version}</version>
106 <scope>runtime</scope>
107 </dependency>
108 <dependency>
109 <groupId>org.apache.logging.log4j</groupId>
110 <artifactId>log4j-api</artifactId>
111 <version>${log4j.version}</version>
112 <scope>runtime</scope>
113 </dependency>
114 <dependency>
115 <groupId>org.apache.logging.log4j</groupId>
116 <artifactId>log4j-core</artifactId>
117 <version>${log4j.version}</version>
118 <scope>runtime</scope>
119 </dependency>
120 </dependencies>
121
122 <build>
123 <plugins>
124
125 <!-- Java Compiler -->
126 <plugin>
127 <groupId>org.apache.maven.plugins</groupId>
128 <artifactId>maven-compiler-plugin</artifactId>
129 <version>3.8.0</version>
130 <configuration>
131 <source>${jdk.version}</source>
132 <target>${jdk.version}</target>
133 </configuration>
134 </plugin>
135
136 <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
137 <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
138 <plugin>
139 <groupId>org.apache.maven.plugins</groupId>
140 <artifactId>maven-shade-plugin</artifactId>
141 <version>3.4.1</version>
142 <executions>
143 <!-- Run shade goal on package phase -->
144 <execution>
145 <phase>package</phase>
146 <goals>
147 <goal>shade</goal>
148 </goals>
149 <configuration>
150 <artifactSet>
151 <excludes>
152 <exclude>org.apache.flink:force-shading</exclude>
153 <exclude>com.google.code.findbugs:jsr305</exclude>
154 <exclude>org.slf4j:*</exclude>
155 <exclude>org.apache.logging.log4j:*</exclude>
156 </excludes>
157 </artifactSet>
158 <filters>
159 <filter>
160 <!-- Do not copy the signatures in the META-INF folder.
161 Otherwise, this might cause SecurityExceptions when using the JAR. -->
162 <artifact>*:*</artifact>
163 <excludes>
164 <exclude>META-INF/*.SF</exclude>
165 <exclude>META-INF/*.DSA</exclude>
166 <exclude>META-INF/*.RSA</exclude>
167 </excludes>
168 </filter>
169 </filters>
170 <transformers>
171 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
172 </transformers>
173 </configuration>
174 </execution>
175 </executions>
176 </plugin>
177 </plugins>
178
179 <pluginManagement>
180 <plugins>
181
182 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
183 <plugin>
184 <groupId>org.eclipse.m2e</groupId>
185 <artifactId>lifecycle-mapping</artifactId>
186 <version>1.0.0</version>
187 <configuration>
188 <lifecycleMappingMetadata>
189 <pluginExecutions>
190 <pluginExecution>
191 <pluginExecutionFilter>
192 <groupId>org.apache.maven.plugins</groupId>
193 <artifactId>maven-shade-plugin</artifactId>
194 <versionRange>[3.1.1,)</versionRange>
195 <goals>
196 <goal>shade</goal>
197 </goals>
198 </pluginExecutionFilter>
199 <action>
200 <ignore/>
201 </action>
202 </pluginExecution>
203 <pluginExecution>
204 <pluginExecutionFilter>
205 <groupId>org.apache.maven.plugins</groupId>
206 <artifactId>maven-compiler-plugin</artifactId>
207 <versionRange>[3.1,)</versionRange>
208 <goals>
209 <goal>testCompile</goal>
210 <goal>compile</goal>
211 </goals>
212 </pluginExecutionFilter>
213 <action>
214 <ignore/>
215 </action>
216 </pluginExecution>
217 </pluginExecutions>
218 </lifecycleMappingMetadata>
219 </configuration>
220 </plugin>
221 </plugins>
222 </pluginManagement>
223 </build>
224</project>
The Uber Jar file can be built using the following script (build.sh).
1# build.sh
2#!/usr/bin/env bash
3SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
4SRC_PATH=$SCRIPT_DIR/package
5
6# remove contents under $SRC_PATH (except for the folders beginging with lab)
7shopt -s extglob
8rm -rf $SRC_PATH/!(lab*)
9
10## Generate Uber jar file for individual labs
11echo "generate Uber jar for PyFlink app..."
12mkdir $SRC_PATH/lib
13
14...
15
16mvn clean install -f $SRC_PATH/lab4-pipeline/pom.xml \
17 && mv $SRC_PATH/lab4-pipeline/target/lab4-pipeline-1.0.0.jar $SRC_PATH/lib \
18 && rm -rf $SRC_PATH/lab4-pipeline/target
Application Source
A source table is created to read messages from the taxi-rides topic using the Kafka Connector. The source records are bucketed in a window of 5 seconds using the TUMBLE windowing table-valued function, and those within buckets are aggregated by vendor ID, window start and window end variables. Then they are inserted into the sink table, which leads to ingesting them into an OpenSearch index named trip_stats using the OpenSearch connector.
In the main method, we create all the source and sink tables after mapping relevant application properties. Then the output records are written to the OpenSearch index. Note that the output records are printed in the terminal additionally when the app is running locally for checking them easily.
1# forwarder/processor.py
2import os
3import re
4import json
5
6from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
7from pyflink.table import StreamTableEnvironment
8
9RUNTIME_ENV = os.environ.get("RUNTIME_ENV", "LOCAL") # LOCAL or DOCKER
10BOOTSTRAP_SERVERS = os.environ.get("BOOTSTRAP_SERVERS") # overwrite app config
11OPENSEARCH_HOSTS = os.environ.get("OPENSEARCH_HOSTS") # overwrite app config
12
13env = StreamExecutionEnvironment.get_execution_environment()
14env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
15env.enable_checkpointing(5000)
16
17if RUNTIME_ENV == "LOCAL":
18 CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
19 PARENT_DIR = os.path.dirname(CURRENT_DIR)
20 APPLICATION_PROPERTIES_FILE_PATH = os.path.join(
21 CURRENT_DIR, "application_properties.json"
22 )
23 JAR_FILES = ["lab4-pipeline-1.0.0.jar"]
24 JAR_PATHS = tuple(
25 [f"file://{os.path.join(PARENT_DIR, 'jars', name)}" for name in JAR_FILES]
26 )
27 print(JAR_PATHS)
28 env.add_jars(*JAR_PATHS)
29else:
30 APPLICATION_PROPERTIES_FILE_PATH = (
31 "/etc/flink/forwarder/application_properties.json"
32 )
33
34table_env = StreamTableEnvironment.create(stream_execution_environment=env)
35
36
37def get_application_properties():
38 if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
39 with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
40 contents = file.read()
41 properties = json.loads(contents)
42 return properties
43 else:
44 raise RuntimeError(
45 f"A file at '{APPLICATION_PROPERTIES_FILE_PATH}' was not found"
46 )
47
48
49def property_map(props: dict, property_group_id: str):
50 for prop in props:
51 if prop["PropertyGroupId"] == property_group_id:
52 return prop["PropertyMap"]
53
54
55def inject_security_opts(opts: dict, bootstrap_servers: str):
56 if re.search("9098$", bootstrap_servers):
57 opts = {
58 **opts,
59 **{
60 "properties.security.protocol": "SASL_SSL",
61 "properties.sasl.mechanism": "AWS_MSK_IAM",
62 "properties.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;",
63 "properties.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler",
64 },
65 }
66 return ", ".join({f"'{k}' = '{v}'" for k, v in opts.items()})
67
68
69def create_source_table(table_name: str, topic_name: str, bootstrap_servers: str):
70 opts = {
71 "connector": "kafka",
72 "topic": topic_name,
73 "properties.bootstrap.servers": bootstrap_servers,
74 "properties.group.id": "soruce-group",
75 "format": "json",
76 "scan.startup.mode": "latest-offset",
77 }
78
79 stmt = f"""
80 CREATE TABLE {table_name} (
81 id VARCHAR,
82 vendor_id INT,
83 pickup_date VARCHAR,
84 dropoff_date VARCHAR,
85 passenger_count INT,
86 pickup_longitude VARCHAR,
87 pickup_latitude VARCHAR,
88 dropoff_longitude VARCHAR,
89 dropoff_latitude VARCHAR,
90 store_and_fwd_flag VARCHAR,
91 gc_distance INT,
92 trip_duration INT,
93 google_distance INT,
94 google_duration INT,
95 process_time AS PROCTIME()
96 ) WITH (
97 {inject_security_opts(opts, bootstrap_servers)}
98 )
99 """
100 print(stmt)
101 return stmt
102
103
104def create_sink_table(table_name: str, os_hosts: str, os_index: str):
105 stmt = f"""
106 CREATE TABLE {table_name} (
107 vendor_id VARCHAR,
108 trip_count BIGINT NOT NULL,
109 passenger_count INT,
110 trip_duration INT,
111 window_start TIMESTAMP(3) NOT NULL,
112 window_end TIMESTAMP(3) NOT NULL
113 ) WITH (
114 'connector'= 'opensearch',
115 'hosts' = '{os_hosts}',
116 'index' = '{os_index}'
117 )
118 """
119 print(stmt)
120 return stmt
121
122
123def create_print_table(table_name: str):
124 stmt = f"""
125 CREATE TABLE {table_name} (
126 vendor_id VARCHAR,
127 trip_count BIGINT NOT NULL,
128 passenger_count INT,
129 trip_duration INT,
130 window_start TIMESTAMP(3) NOT NULL,
131 window_end TIMESTAMP(3) NOT NULL
132 ) WITH (
133 'connector'= 'print'
134 )
135 """
136 print(stmt)
137 return stmt
138
139
140def set_insert_sql(source_table_name: str, sink_table_name: str):
141 stmt = f"""
142 INSERT INTO {sink_table_name}
143 SELECT
144 CAST(vendor_id AS STRING) AS vendor_id,
145 COUNT(id) AS trip_count,
146 SUM(passenger_count) AS passenger_count,
147 SUM(trip_duration) AS trip_duration,
148 window_start,
149 window_end
150 FROM TABLE(
151 TUMBLE(TABLE {source_table_name}, DESCRIPTOR(process_time), INTERVAL '5' SECONDS))
152 GROUP BY vendor_id, window_start, window_end
153 """
154 print(stmt)
155 return stmt
156
157
158def main():
159 #### map source/sink properties
160 props = get_application_properties()
161 ## source
162 source_property_group_key = "source.config.0"
163 source_properties = property_map(props, source_property_group_key)
164 print(">> source properties")
165 print(source_properties)
166 source_table_name = source_properties["table.name"]
167 source_topic_name = source_properties["topic.name"]
168 source_bootstrap_servers = (
169 BOOTSTRAP_SERVERS or source_properties["bootstrap.servers"]
170 )
171 ## sink
172 sink_property_group_key = "sink.config.0"
173 sink_properties = property_map(props, sink_property_group_key)
174 print(">> sink properties")
175 print(sink_properties)
176 sink_table_name = sink_properties["table.name"]
177 sink_os_hosts = OPENSEARCH_HOSTS or sink_properties["os_hosts"]
178 sink_os_index = sink_properties["os_index"]
179 ## print
180 print_table_name = "sink_print"
181 #### create tables
182 table_env.execute_sql(
183 create_source_table(
184 source_table_name, source_topic_name, source_bootstrap_servers
185 )
186 )
187 table_env.execute_sql(
188 create_sink_table(sink_table_name, sink_os_hosts, sink_os_index)
189 )
190 table_env.execute_sql(create_print_table(print_table_name))
191 #### insert into sink tables
192 if RUNTIME_ENV == "LOCAL":
193 statement_set = table_env.create_statement_set()
194 statement_set.add_insert_sql(set_insert_sql(source_table_name, sink_table_name))
195 statement_set.add_insert_sql(set_insert_sql(print_table_name, sink_table_name))
196 statement_set.execute().wait()
197 else:
198 table_result = table_env.execute_sql(
199 set_insert_sql(source_table_name, sink_table_name)
200 )
201 print(table_result.get_job_client().get_job_status())
202
203
204if __name__ == "__main__":
205 main()
1// forwarder/application_properties.json
2[
3 {
4 "PropertyGroupId": "kinesis.analytics.flink.run.options",
5 "PropertyMap": {
6 "python": "processor.py",
7 "jarfile": "package/lib/lab4-pipeline-1.0.0.jar"
8 }
9 },
10 {
11 "PropertyGroupId": "source.config.0",
12 "PropertyMap": {
13 "table.name": "taxi_rides_src",
14 "topic.name": "taxi-rides",
15 "bootstrap.servers": "localhost:29092"
16 }
17 },
18 {
19 "PropertyGroupId": "sink.config.0",
20 "PropertyMap": {
21 "table.name": "trip_stats_sink",
22 "os_hosts": "http://opensearch:9200",
23 "os_index": "trip_stats"
24 }
25 }
26]
Run Application
Execute on Local Flink Cluster
We can run the application in the Flink cluster on Docker and the steps are shown below. Either the Kafka cluster on Amazon MSK or a local Kafka cluster can be used depending on which Docker Compose file we use. In either way, we can check the job details on the Flink web UI on localhost:8081. Note that, if we use the local Kafka cluster option, we have to start the producer application in a different terminal as well as to deploy a local OpenSearch cluster.
1## set aws credentials environment variables
2export AWS_ACCESS_KEY_ID=<aws-access-key-id>
3export AWS_SECRET_ACCESS_KEY=<aws-secret-access-key>
4
5# set addtional environment variables when using AWS services if using compose-msk.yml
6# values can be obtained in Terraform outputs or AWS Console
7export BOOTSTRAP_SERVERS=<bootstrap-servers>
8export OPENSEARCH_HOSTS=<opensearch-hosts>
9
10## run docker compose service
11# or with msk cluster
12docker-compose -f compose-msk.yml up -d
13
14# # with local kafka and opensearch cluster
15# docker-compose -f compose-local-kafka.yml up -d
16# docker-compose -f compose-extra.yml up -d
17
18## run the producer application in another terminal if using a local cluster
19# python producer/app.py
20
21## submit pyflink application
22docker exec jobmanager /opt/flink/bin/flink run \
23 --python /etc/flink/forwarder/processor.py \
24 --jarfile /etc/flink/package/lib/lab4-pipeline-1.0.0.jar \
25 -d
Once the Pyflink application is submitted, we can check the details of it on the Flink UI as shown below.
Application Result
Kafka Topic
We can see the topic (taxi-rides) is created, and the details of the topic can be found on the Topics menu on localhost:3000.
OpenSearch Index
The ingested data can be checked easily using the Query Workbench as shown below.
To monitor the status of taxi rides, a horizontal bar chart is created in the OpenSearch Dashboard. The average trip duration is selected as the metric, and the records are grouped by vendor ID. We can see the values change while new records arrives.
Summary
In this lab, we created a Pyflink application that writes accumulated taxi rides data into an OpenSearch cluster. It aggregated the number of trips/passengers and trip durations by vendor ID for a window of 5 seconds. The data was then used to create a chart that monitors the status of taxi rides in the OpenSearch Dashboard.
Comments