OpenSearch is a popular search and analytics engine and its use cases cover log analytics, real-time application monitoring, and clickstream analysis. OpenSearch can be deployed on its own or via Amazon OpenSearch Service. Apache Kafka is a distributed event store and stream-processing platform, and it aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. On AWS, Apache Kafka can be deployed via Amazon Managed Streaming for Apache Kafka (MSK).
When it comes to ingesting data from Apache Kafka into OpenSearch, OpenSearch has a tool called Data Prepper and Amazon OpenSearch Service has a feature called Amazon OpenSearch Ingestion. Alternatively we can use Kafka Connect, which is a tool for scalably and reliably streaming data between Apache Kafka and other systems. On AWS, we can run fully managed Kafka workload using Amazon MSK Connect.
In this post, we will discuss how to develop a data pipeline from Apache Kafka into OpenSearch locally using Docker while the pipeline will be deployed on AWS in the next post. Fake impressions and clicks data will be pushed into Kafka topics using a Kafka source connector and those records will be ingested into OpenSearch indexes using a sink connector for near-real time analytics.
- Part 1 Introduction
- Part 2 Develop Camel DynamoDB Sink Connector
- Part 3 Deploy Camel DynamoDB Sink Connector
- Part 4 Develop Aiven OpenSearch Sink Connector (this post)
- Part 5 Deploy Aiven OpenSearch Sink Connector
Architecture
Fake impressions and clicks events are generated by the Amazon MSK Data Generator, and they are pushed into the corresponding Kafka topics. The topic records are ingested into OpenSearch indexes with the same names for near real-time analysis using the Aiven’s OpenSearch Connector for Apache Kafka. The infrastructure is created using Docker and the source can be found in the GitHub repository of this post.
OpenSearch Cluster
We can create OpenSearch and OpenSearch Dashboards using Docker Compose as illustrated in the OpenSearch Quickstart documentation. The OpenSearch stack consists of two cluster nodes and a single dashboard service. Note that you should disable memory paging and swapping performance on the host before creating those - see the documentation for details.
1# docker-compose.yml
2version: "3.5"
3
4services:
5
6 ...
7
8 opensearch-node1:
9 image: opensearchproject/opensearch:2.9.0
10 container_name: opensearch-node1
11 environment:
12 - cluster.name=opensearch-cluster
13 - node.name=opensearch-node1
14 - discovery.seed_hosts=opensearch-node1,opensearch-node2
15 - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
16 - bootstrap.memory_lock=true
17 - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
18 ulimits:
19 memlock:
20 soft: -1
21 hard: -1
22 nofile:
23 soft: 65536
24 hard: 65536
25 volumes:
26 - opensearch-node1-data:/usr/share/opensearch/data
27 ports:
28 - 9200:9200
29 - 9600:9600
30 networks:
31 - service-net
32 opensearch-node2:
33 image: opensearchproject/opensearch:2.9.0
34 container_name: opensearch-node2
35 environment:
36 - cluster.name=opensearch-cluster
37 - node.name=opensearch-node2
38 - discovery.seed_hosts=opensearch-node1,opensearch-node2
39 - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
40 - bootstrap.memory_lock=true
41 - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
42 ulimits:
43 memlock:
44 soft: -1
45 hard: -1
46 nofile:
47 soft: 65536
48 hard: 65536
49 volumes:
50 - opensearch-node1-data:/usr/share/opensearch/data
51 networks:
52 - service-net
53 opensearch-dashboards:
54 image: opensearchproject/opensearch-dashboards:2.9.0
55 container_name: opensearch-dashboards
56 ports:
57 - 5601:5601
58 expose:
59 - "5601"
60 environment:
61 OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
62 networks:
63 - service-net
64
65networks:
66 service-net:
67 name: service-net
68
69volumes:
70
71 ...
72
73 opensearch-node1-data:
74 driver: local
75 name: opensearch-node1-data
76 opensearch-node2-data:
77 driver: local
78 name: opensearch-node2-data
Kafka Cluster
We will create a Kafka cluster with a single broker and Zookeeper node using the Bitnami container images. Kafka 2.8.1 is used as it is the recommended Kafka version by Amazon MSK. The cluster communicates on port 9092 internally and the bootstrap server of the broker becomes kafka-0:9092 for those that run in the same network.
1# docker-compose.yml
2version: "3.5"
3
4services:
5 zookeeper:
6 image: bitnami/zookeeper:3.5
7 container_name: zookeeper
8 ports:
9 - "2181"
10 networks:
11 - service-net
12 environment:
13 - ALLOW_ANONYMOUS_LOGIN=yes
14 volumes:
15 - zookeeper-data:/bitnami/zookeeper
16 kafka-0:
17 image: bitnami/kafka:2.8.1
18 container_name: kafka-0
19 expose:
20 - 9092
21 ports:
22 - "29092:29092"
23 networks:
24 - service-net
25 environment:
26 - ALLOW_PLAINTEXT_LISTENER=yes
27 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
28 - KAFKA_CFG_BROKER_ID=0
29 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
30 - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29092
31 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,EXTERNAL://localhost:29092
32 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
33 - KAFKA_CFG_NUM_PARTITIONS=2
34 - KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=1
35 volumes:
36 - kafka-0-data:/bitnami/kafka
37 depends_on:
38 - zookeeper
39
40 ...
41
42volumes:
43 zookeeper-data:
44 driver: local
45 name: zookeeper-data
46 kafka-0-data:
47 driver: local
48 name: kafka-0-data
49
50 ...
Kafka Connect
We can use the same Docker image as Kafka Connect is a part of the Kafka distribution. The Kafka connect server runs in the distributed mode so that multiple connectors can be deployed to the same server. Note the Kafka bootstrap server and plugin path configuration values in the connect configuration file (connect-distributed.properties) listed below. As mentioned earlier, the bootstrap server can be accessed on kafka-0:9092 as it is deployed in the same network. Also, we will locate connector source files in subdirectories of the plugin path.
- bootstrap.servers=kafka-0:9092
- plugin.path=/opt/connectors
1# docker-compose.yml
2version: "3.5"
3
4services:
5
6 ...
7
8 kafka-connect:
9 image: bitnami/kafka:2.8.1
10 container_name: connect
11 command: >
12 /opt/bitnami/kafka/bin/connect-distributed.sh
13 /opt/bitnami/kafka/config/connect-distributed.properties
14 ports:
15 - "8083:8083"
16 networks:
17 - service-net
18 volumes:
19 - "./configs/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties"
20 - "./connectors/opensearch-connector:/opt/connectors/opensearch-connector"
21 - "./connectors/msk-datagen:/opt/connectors/msk-datagen"
22 depends_on:
23 - zookeeper
24 - kafka-0
25
26 ...
Download Connectors
The connector sources need to be downloaded into the ./connectors
path so that they can be volume-mapped to the container’s plugin path (/opt/connectors
). The MSK Data Generator is a single Jar file, and it can be kept as it is. On the other hand, the Aiven OpenSearch sink connector is an archive file, and it should be decompressed. Note the zip file will be used to create a custom plugin of MSK Connect in the next post. The following script downloads the connector sources into the host path.
1# download.sh
2#!/usr/bin/env bash
3shopt -s extglob
4
5SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
6
7SRC_PATH=${SCRIPT_DIR}/connectors
8rm -rf ${SRC_PATH} && mkdir ${SRC_PATH}
9
10## Avien opensearch sink connector
11echo "downloading opensearch sink connector..."
12DOWNLOAD_URL=https://github.com/Aiven-Open/opensearch-connector-for-apache-kafka/releases/download/v3.1.0/opensearch-connector-for-apache-kafka-3.1.0.zip
13
14curl -L -o ${SRC_PATH}/tmp.zip ${DOWNLOAD_URL} \
15 && unzip -qq ${SRC_PATH}/tmp.zip -d ${SRC_PATH} \
16 && rm -rf $SRC_PATH/!(opensearch-connector-for-apache-kafka-3.1.0) \
17 && mv $SRC_PATH/opensearch-connector-for-apache-kafka-3.1.0 $SRC_PATH/opensearch-connector \
18 && cd $SRC_PATH/opensearch-connector \
19 && zip ../opensearch-connector.zip *
20
21## MSK Data Generator Souce Connector
22echo "downloading msk data generator..."
23DOWNLOAD_URL=https://github.com/awslabs/amazon-msk-data-generator/releases/download/v0.4.0/msk-data-generator-0.4-jar-with-dependencies.jar
24
25mkdir ${SRC_PATH}/msk-datagen \
26 && curl -L -o ${SRC_PATH}/msk-datagen/msk-data-generator.jar ${DOWNLOAD_URL}
Below shows the folder structure after the connectors are downloaded successfully.
1$ tree connectors/
2connectors/
3├── msk-datagen
4│ └── msk-data-generator.jar
5├── opensearch-connector
6
7...
8
9│ ├── opensearch-2.6.0.jar
10│ ├── opensearch-cli-2.6.0.jar
11│ ├── opensearch-common-2.6.0.jar
12│ ├── opensearch-connector-for-apache-kafka-3.1.0.jar
13│ ├── opensearch-core-2.6.0.jar
14
15...
16
17└── opensearch-connector.zip
18
192 directories, 56 files
Kafka Management App
A Kafka management app can be a good companion for development as it helps monitor and manage resources on an easy-to-use user interface. We’ll use Kpow Community Edition in this post, and we can link a single Kafka cluster, Kafka connect server and schema registry. Note that the community edition is valid for 12 months and the license can be requested in this page. Once requested, the license details will be emailed, and they can be added as an environment file (env_file).
1# docker-compose.yml
2version: "3.5"
3
4services:
5
6 ...
7
8 kpow:
9 image: factorhouse/kpow-ce:91.5.1
10 container_name: kpow
11 ports:
12 - "3000:3000"
13 networks:
14 - service-net
15 environment:
16 BOOTSTRAP: kafka-0:9092
17 CONNECT_REST_URL: http://kafka-connect:8083
18 env_file: # https://kpow.io/get-started/#individual
19 - ./kpow.env
20
21 ...
Data Ingestion to Kafka Topic
Create Index Mappings
The topic messages include a timestamp field (created_at), but its type is not identified correctly via dynamic mapping. Instead, indexes are created explicitly as shown below.
1# configs/create-index-mappings.sh
2#!/usr/bin/env bash
3echo "Create impressions index and field mapping"
4curl -X PUT "https://localhost:9200/impressions" -ku admin:admin -H 'Content-Type: application/json' -d'
5{
6 "mappings": {
7 "properties": {
8 "bid_id": {
9 "type": "text"
10 },
11 "created_at": {
12 "type": "date",
13 "format": "yyyy-MM-dd HH:mm:ss"
14 },
15 "campaign_id": {
16 "type": "text"
17 },
18 "creative_details": {
19 "type": "keyword"
20 },
21 "country_code": {
22 "type": "keyword"
23 }
24 }
25 }
26}'
27
28echo
29echo "Create clicks index and field mapping"
30curl -X PUT "https://localhost:9200/clicks" -ku admin:admin -H 'Content-Type: application/json' -d'
31{
32 "mappings": {
33 "properties": {
34 "correlation_id": {
35 "type": "text"
36 },
37 "created_at": {
38 "type": "date",
39 "format": "yyyy-MM-dd HH:mm:ss"
40 },
41 "tracker": {
42 "type": "text"
43 }
44 }
45 }
46}'
47
48echo
Once created, we can check them in the OpenSearch Dashboards on localhost:5601.
Source Connector Creation
We can create the source connector programmatically using the Connect REST API. The REST endpoint requires a JSON payload that includes the connector name and configurations.
1$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
2 http://localhost:8083/connectors/ -d @configs/source.json
Below shows the source connector configuration file.
1// configs/source.json
2{
3 "name": "ad-tech-source",
4 "config": {
5 "connector.class": "com.amazonaws.mskdatagen.GeneratorSourceConnector",
6 "tasks.max": "2",
7 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
8 "key.converter.schemas.enable": false,
9 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
10 "value.converter.schemas.enable": false,
11
12 "genkp.impressions.with": "#{Code.isbn10}",
13 "genv.impressions.bid_id.with": "#{Code.isbn10}",
14 "genv.impressions.campaign_id.with": "#{Code.isbn10}",
15 "genv.impressions.creative_details.with": "#{Color.name}",
16 "genv.impressions.country_code.with": "#{Address.countryCode}",
17
18 "genkp.clicks.with": "#{Code.isbn10}",
19 "genv.clicks.correlation_id.sometimes.matching": "impressions.value.bid_id",
20 "genv.clicks.correlation_id.sometimes.with": "NA",
21 "genv.clicks.tracker.with": "#{Lorem.characters '15'}",
22
23 "global.throttle.ms": "500",
24 "global.history.records.max": "1000"
25 }
26}
The first six attributes are in relation to general configurations. The connector class (connector.class) is required for any connector and I set it for the MSK Data Generator. Also, two tasks are allocated to it (tasks.max). The message key is set to be converted into string (key.converter) while the value to json (value.converter). The former is because the keys are configured to have string primitive values (genkp) by the source connector. Finally, schemas are not enabled for both the key and value.
The remaining attributes are for the MSK Data Generator. Two topics named impressions and clicks will be created, and the messages attributes are generated by the Java faker library. Interestingly the bid ID of the impression message and the correlation ID of the click message share the same value sometimes. This is because only a fraction of impressions results in clicks in practice.
Once created successfully, we can check the connector status as following.
1$ curl http://localhost:8083/connectors/ad-tech-source/status | json_pp
2{
3 "connector" : {
4 "state" : "RUNNING",
5 "worker_id" : "172.19.0.8:8083"
6 },
7 "name" : "ad-tech-source",
8 "tasks" : [
9 {
10 "id" : 0,
11 "state" : "RUNNING",
12 "worker_id" : "172.19.0.8:8083"
13 },
14 {
15 "id" : 1,
16 "state" : "RUNNING",
17 "worker_id" : "172.19.0.8:8083"
18 }
19 ],
20 "type" : "source"
21}
Source Data
We can check messages are ingested into the impressions and clicks topics in Kpow on localhost:3000.
As mentioned earlier, only a fraction of correlation IDs of the click messages has actual values, and we can check that by inspecting the messages.
Data Ingestion to OpenSearch
Sink Connector Creation
Similar to the source connector, we can create the sink connector using the REST API.
1$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
2 http://localhost:8083/connectors/ -d @configs/sink.json
Below shows the sink connector configuration file.
1// configs/sink.json
2{
3 "name": "ad-tech-sink",
4 "config": {
5 "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
6 "tasks.max": "2",
7 "topics": "impressions,clicks",
8 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
9 "key.converter.schemas.enable": false,
10 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
11 "value.converter.schemas.enable": false,
12
13 "connection.url": "https://opensearch-node1:9200,https://opensearch-node2:9200",
14 "connection.username": "admin",
15 "connection.password": "admin",
16 "schema.ignore": true,
17 "key.ignore": true,
18 "type.name": "_doc",
19 "behavior.on.malformed.documents": "fail",
20 "behavior.on.null.values": "ignore",
21 "behavior.on.version.conflict": "ignore",
22
23 "errors.deadletterqueue.topic.name": "ad-tech-dl",
24 "errors.tolerance": "all",
25 "errors.deadletterqueue.context.headers.enable": true,
26 "errors.deadletterqueue.topic.replication.factor": 1,
27
28 "transforms": "insertTS,formatTS",
29 "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
30 "transforms.insertTS.timestamp.field": "created_at",
31 "transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
32 "transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss",
33 "transforms.formatTS.field": "created_at",
34 "transforms.formatTS.target.type": "string"
35 }
36}
The connector is configured to write messages from the impressions and clicks topics into the OpenSearch indexes created earlier. It uses the same key and value converters to the source connector, and schemas are not enabled for both the key and value.
The cluster request URLs are added to the connection URL attribute (connection.url), and the default username and password are specified accordingly. These values are necessary to make HTTP requests to the cluster as shown earlier when we created indexes with explicit schema mapping. Also, as the topics are append-only logs, we can set the document ID to be [topic-name].[partition].[offset] by setting key.ignore to true. See the connector configuration document for more details.
Having an event timestamp attribute can be useful for performing temporal analysis. As I don’t find a comprehensive way to set it up in the source connector, a new field called created_at is added using single message transforms (SMTs). Specifically I added two transforms - insertTS and formatTS. As the name suggests, the former inserts the system timestamp value while it is formatted into yyyy-MM-dd HH:mm:ss by the latter.
Once created successfully, we can check the connector status as following.
1$ curl http://localhost:8083/connectors/ad-tech-sink/status | json_pp
2{
3 "connector" : {
4 "state" : "RUNNING",
5 "worker_id" : "172.20.0.8:8083"
6 },
7 "name" : "ad-tech-sink",
8 "tasks" : [
9 {
10 "id" : 0,
11 "state" : "RUNNING",
12 "worker_id" : "172.20.0.8:8083"
13 },
14 {
15 "id" : 1,
16 "state" : "RUNNING",
17 "worker_id" : "172.20.0.8:8083"
18 }
19 ],
20 "type" : "sink"
21}
OpenSearch Destination
In OpenSearch Dashboards, we can search clicks that are associated with impressions. As expected, only a small portion of clicks are searched.
Moreover, we can join correlated impressions and clicks quickly using the Query Workbench. Below shows a simple SQL query that joins impressions and associating clicks that are created after a certain time point.
Summary
Kafka Connect can be effective to ingesting data from Apache Kafka into OpenSearch. In this post, we discussed how to develop a data pipeline from Apache Kafka into OpenSearch locally using Docker. Fake impressions and clicks data was pushed into Kafka topics using a Kafka source connector and those records will be ingested into OpenSearch indexes using a sink connector for near-real time analytics. In this next post, the pipeline will be deployed on AWS using Amazon OpenSearch Service, Amazon MSK and Amazon MSK Connect.
Comments