OpenSearch is a popular search and analytics engine and its use cases cover log analytics, real-time application monitoring, and clickstream analysis. OpenSearch can be deployed on its own or via Amazon OpenSearch Service. Apache Kafka is a distributed event store and stream-processing platform, and it aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. On AWS, Apache Kafka can be deployed via Amazon Managed Streaming for Apache Kafka (MSK).

When it comes to ingesting data from Apache Kafka into OpenSearch, OpenSearch has a tool called Data Prepper and Amazon OpenSearch Service has a feature called Amazon OpenSearch Ingestion. Alternatively we can use Kafka Connect, which is a tool for scalably and reliably streaming data between Apache Kafka and other systems. On AWS, we can run fully managed Kafka workload using Amazon MSK Connect.

In this post, we will discuss how to develop a data pipeline from Apache Kafka into OpenSearch locally using Docker while the pipeline will be deployed on AWS in the next post. Fake impressions and clicks data will be pushed into Kafka topics using a Kafka source connector and those records will be ingested into OpenSearch indexes using a sink connector for near-real time analytics.

Architecture

Fake impressions and clicks events are generated by the Amazon MSK Data Generator, and they are pushed into the corresponding Kafka topics. The topic records are ingested into OpenSearch indexes with the same names for near real-time analysis using the Aiven’s OpenSearch Connector for Apache Kafka. The infrastructure is created using Docker and the source can be found in the GitHub repository of this post.

OpenSearch Cluster

We can create OpenSearch and OpenSearch Dashboards using Docker Compose as illustrated in the OpenSearch Quickstart documentation. The OpenSearch stack consists of two cluster nodes and a single dashboard service. Note that you should disable memory paging and swapping performance on the host before creating those - see the documentation for details.

 1# docker-compose.yml
 2version: "3.5"
 3
 4services:
 5
 6  ...
 7
 8  opensearch-node1:
 9    image: opensearchproject/opensearch:2.9.0
10    container_name: opensearch-node1
11    environment:
12      - cluster.name=opensearch-cluster
13      - node.name=opensearch-node1
14      - discovery.seed_hosts=opensearch-node1,opensearch-node2
15      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
16      - bootstrap.memory_lock=true
17      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
18    ulimits:
19      memlock:
20        soft: -1
21        hard: -1
22      nofile:
23        soft: 65536
24        hard: 65536
25    volumes:
26      - opensearch-node1-data:/usr/share/opensearch/data
27    ports:
28      - 9200:9200
29      - 9600:9600
30    networks:
31      - service-net
32  opensearch-node2:
33    image: opensearchproject/opensearch:2.9.0
34    container_name: opensearch-node2
35    environment:
36      - cluster.name=opensearch-cluster
37      - node.name=opensearch-node2
38      - discovery.seed_hosts=opensearch-node1,opensearch-node2
39      - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
40      - bootstrap.memory_lock=true
41      - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
42    ulimits:
43      memlock:
44        soft: -1
45        hard: -1
46      nofile:
47        soft: 65536
48        hard: 65536
49    volumes:
50      - opensearch-node1-data:/usr/share/opensearch/data
51    networks:
52      - service-net
53  opensearch-dashboards:
54    image: opensearchproject/opensearch-dashboards:2.9.0
55    container_name: opensearch-dashboards
56    ports:
57      - 5601:5601
58    expose:
59      - "5601"
60    environment:
61      OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
62    networks:
63      - service-net
64
65networks:
66  service-net:
67    name: service-net
68
69volumes:
70
71  ...
72
73  opensearch-node1-data:
74    driver: local
75    name: opensearch-node1-data
76  opensearch-node2-data:
77    driver: local
78    name: opensearch-node2-data

Kafka Cluster

We will create a Kafka cluster with a single broker and Zookeeper node using the Bitnami container images. Kafka 2.8.1 is used as it is the recommended Kafka version by Amazon MSK. The cluster communicates on port 9092 internally and the bootstrap server of the broker becomes kafka-0:9092 for those that run in the same network.

 1# docker-compose.yml
 2version: "3.5"
 3
 4services:
 5  zookeeper:
 6    image: bitnami/zookeeper:3.5
 7    container_name: zookeeper
 8    ports:
 9      - "2181"
10    networks:
11      - service-net
12    environment:
13      - ALLOW_ANONYMOUS_LOGIN=yes
14    volumes:
15      - zookeeper-data:/bitnami/zookeeper
16  kafka-0:
17    image: bitnami/kafka:2.8.1
18    container_name: kafka-0
19    expose:
20      - 9092
21    ports:
22      - "29092:29092"
23    networks:
24      - service-net
25    environment:
26      - ALLOW_PLAINTEXT_LISTENER=yes
27      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
28      - KAFKA_CFG_BROKER_ID=0
29      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
30      - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:29092
31      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-0:9092,EXTERNAL://localhost:29092
32      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
33      - KAFKA_CFG_NUM_PARTITIONS=2
34      - KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=1
35    volumes:
36      - kafka-0-data:/bitnami/kafka
37    depends_on:
38      - zookeeper
39
40   ...
41
42volumes:
43  zookeeper-data:
44    driver: local
45    name: zookeeper-data
46  kafka-0-data:
47    driver: local
48    name: kafka-0-data
49
50  ...

Kafka Connect

We can use the same Docker image as Kafka Connect is a part of the Kafka distribution. The Kafka connect server runs in the distributed mode so that multiple connectors can be deployed to the same server. Note the Kafka bootstrap server and plugin path configuration values in the connect configuration file (connect-distributed.properties) listed below. As mentioned earlier, the bootstrap server can be accessed on kafka-0:9092 as it is deployed in the same network. Also, we will locate connector source files in subdirectories of the plugin path.

  • bootstrap.servers=kafka-0:9092
  • plugin.path=/opt/connectors
 1# docker-compose.yml
 2version: "3.5"
 3
 4services:
 5
 6  ...
 7
 8  kafka-connect:
 9    image: bitnami/kafka:2.8.1
10    container_name: connect
11    command: >
12      /opt/bitnami/kafka/bin/connect-distributed.sh
13      /opt/bitnami/kafka/config/connect-distributed.properties      
14    ports:
15      - "8083:8083"
16    networks:
17      - service-net
18    volumes:
19      - "./configs/connect-distributed.properties:/opt/bitnami/kafka/config/connect-distributed.properties"
20      - "./connectors/opensearch-connector:/opt/connectors/opensearch-connector"
21      - "./connectors/msk-datagen:/opt/connectors/msk-datagen"
22    depends_on:
23      - zookeeper
24      - kafka-0
25
26  ...

Download Connectors

The connector sources need to be downloaded into the ./connectors path so that they can be volume-mapped to the container’s plugin path (/opt/connectors). The MSK Data Generator is a single Jar file, and it can be kept as it is. On the other hand, the Aiven OpenSearch sink connector is an archive file, and it should be decompressed. Note the zip file will be used to create a custom plugin of MSK Connect in the next post. The following script downloads the connector sources into the host path.

 1# download.sh
 2#!/usr/bin/env bash
 3shopt -s extglob
 4
 5SCRIPT_DIR="$(cd $(dirname "$0"); pwd)"
 6
 7SRC_PATH=${SCRIPT_DIR}/connectors
 8rm -rf ${SRC_PATH} && mkdir ${SRC_PATH}
 9
10## Avien opensearch sink connector
11echo "downloading opensearch sink connector..."
12DOWNLOAD_URL=https://github.com/Aiven-Open/opensearch-connector-for-apache-kafka/releases/download/v3.1.0/opensearch-connector-for-apache-kafka-3.1.0.zip
13
14curl -L -o ${SRC_PATH}/tmp.zip ${DOWNLOAD_URL} \
15  && unzip -qq ${SRC_PATH}/tmp.zip -d ${SRC_PATH} \
16  && rm -rf $SRC_PATH/!(opensearch-connector-for-apache-kafka-3.1.0) \
17  && mv $SRC_PATH/opensearch-connector-for-apache-kafka-3.1.0 $SRC_PATH/opensearch-connector \
18  && cd $SRC_PATH/opensearch-connector \
19  && zip ../opensearch-connector.zip *
20
21## MSK Data Generator Souce Connector
22echo "downloading msk data generator..."
23DOWNLOAD_URL=https://github.com/awslabs/amazon-msk-data-generator/releases/download/v0.4.0/msk-data-generator-0.4-jar-with-dependencies.jar
24
25mkdir ${SRC_PATH}/msk-datagen \
26  && curl -L -o ${SRC_PATH}/msk-datagen/msk-data-generator.jar ${DOWNLOAD_URL}

Below shows the folder structure after the connectors are downloaded successfully.

 1$ tree connectors/
 2connectors/
 3├── msk-datagen
 4│   └── msk-data-generator.jar
 5├── opensearch-connector
 6
 7...
 8
 9│   ├── opensearch-2.6.0.jar
10│   ├── opensearch-cli-2.6.0.jar
11│   ├── opensearch-common-2.6.0.jar
12│   ├── opensearch-connector-for-apache-kafka-3.1.0.jar
13│   ├── opensearch-core-2.6.0.jar
14
15...
16
17└── opensearch-connector.zip
18
192 directories, 56 files

Kafka Management App

A Kafka management app can be a good companion for development as it helps monitor and manage resources on an easy-to-use user interface. We’ll use Kpow Community Edition in this post, and we can link a single Kafka cluster, Kafka connect server and schema registry. Note that the community edition is valid for 12 months and the license can be requested in this page. Once requested, the license details will be emailed, and they can be added as an environment file (env_file).

 1# docker-compose.yml
 2version: "3.5"
 3
 4services:
 5
 6  ...
 7
 8  kpow:
 9    image: factorhouse/kpow-ce:91.5.1
10    container_name: kpow
11    ports:
12      - "3000:3000"
13    networks:
14      - service-net
15    environment:
16      BOOTSTRAP: kafka-0:9092
17      CONNECT_REST_URL: http://kafka-connect:8083
18    env_file: # https://kpow.io/get-started/#individual
19      - ./kpow.env
20  
21  ...

Data Ingestion to Kafka Topic

Create Index Mappings

The topic messages include a timestamp field (created_at), but its type is not identified correctly via dynamic mapping. Instead, indexes are created explicitly as shown below.

 1# configs/create-index-mappings.sh
 2#!/usr/bin/env bash
 3echo "Create impressions index and field mapping"
 4curl -X PUT "https://localhost:9200/impressions" -ku admin:admin -H 'Content-Type: application/json' -d'
 5{
 6  "mappings": {
 7    "properties": {
 8      "bid_id": {
 9        "type": "text"
10      },
11      "created_at": {
12        "type": "date",
13        "format": "yyyy-MM-dd HH:mm:ss"
14      },
15      "campaign_id": {
16        "type": "text"
17      },
18      "creative_details": {
19        "type": "keyword"
20      },
21      "country_code": {
22        "type": "keyword"
23      }
24    }
25  }
26}'
27
28echo
29echo "Create clicks index and field mapping"
30curl -X PUT "https://localhost:9200/clicks" -ku admin:admin -H 'Content-Type: application/json' -d'
31{
32  "mappings": {
33    "properties": {
34      "correlation_id": {
35        "type": "text"
36      },
37      "created_at": {
38        "type": "date",
39        "format": "yyyy-MM-dd HH:mm:ss"
40      },
41      "tracker": {
42        "type": "text"
43      }
44    }
45  }
46}'
47
48echo

Once created, we can check them in the OpenSearch Dashboards on localhost:5601.

Source Connector Creation

We can create the source connector programmatically using the Connect REST API. The REST endpoint requires a JSON payload that includes the connector name and configurations.

1$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
2  http://localhost:8083/connectors/ -d @configs/source.json

Below shows the source connector configuration file.

 1// configs/source.json
 2{
 3  "name": "ad-tech-source",
 4  "config": {
 5    "connector.class": "com.amazonaws.mskdatagen.GeneratorSourceConnector",
 6    "tasks.max": "2",
 7    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 8    "key.converter.schemas.enable": false,
 9    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
10    "value.converter.schemas.enable": false,
11
12    "genkp.impressions.with": "#{Code.isbn10}",
13    "genv.impressions.bid_id.with": "#{Code.isbn10}",
14    "genv.impressions.campaign_id.with": "#{Code.isbn10}",
15    "genv.impressions.creative_details.with": "#{Color.name}",
16    "genv.impressions.country_code.with": "#{Address.countryCode}",
17
18    "genkp.clicks.with": "#{Code.isbn10}",
19    "genv.clicks.correlation_id.sometimes.matching": "impressions.value.bid_id",
20    "genv.clicks.correlation_id.sometimes.with": "NA",
21    "genv.clicks.tracker.with": "#{Lorem.characters '15'}",
22
23    "global.throttle.ms": "500",
24    "global.history.records.max": "1000"
25  }
26}

The first six attributes are in relation to general configurations. The connector class (connector.class) is required for any connector and I set it for the MSK Data Generator. Also, two tasks are allocated to it (tasks.max). The message key is set to be converted into string (key.converter) while the value to json (value.converter). The former is because the keys are configured to have string primitive values (genkp) by the source connector. Finally, schemas are not enabled for both the key and value.

The remaining attributes are for the MSK Data Generator. Two topics named impressions and clicks will be created, and the messages attributes are generated by the Java faker library. Interestingly the bid ID of the impression message and the correlation ID of the click message share the same value sometimes. This is because only a fraction of impressions results in clicks in practice.

Once created successfully, we can check the connector status as following.

 1$ curl http://localhost:8083/connectors/ad-tech-source/status | json_pp
 2{
 3   "connector" : {
 4      "state" : "RUNNING",
 5      "worker_id" : "172.19.0.8:8083"
 6   },
 7   "name" : "ad-tech-source",
 8   "tasks" : [
 9      {
10         "id" : 0,
11         "state" : "RUNNING",
12         "worker_id" : "172.19.0.8:8083"
13      },
14      {
15         "id" : 1,
16         "state" : "RUNNING",
17         "worker_id" : "172.19.0.8:8083"
18      }
19   ],
20   "type" : "source"
21}

Source Data

We can check messages are ingested into the impressions and clicks topics in Kpow on localhost:3000.

As mentioned earlier, only a fraction of correlation IDs of the click messages has actual values, and we can check that by inspecting the messages.

Data Ingestion to OpenSearch

Sink Connector Creation

Similar to the source connector, we can create the sink connector using the REST API.

1$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
2  http://localhost:8083/connectors/ -d @configs/sink.json

Below shows the sink connector configuration file.

 1// configs/sink.json
 2{
 3  "name": "ad-tech-sink",
 4  "config": {
 5    "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
 6    "tasks.max": "2",
 7    "topics": "impressions,clicks",
 8    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 9    "key.converter.schemas.enable": false,
10    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
11    "value.converter.schemas.enable": false,
12
13    "connection.url": "https://opensearch-node1:9200,https://opensearch-node2:9200",
14    "connection.username": "admin",
15    "connection.password": "admin",
16    "schema.ignore": true,
17    "key.ignore": true,
18    "type.name": "_doc",
19    "behavior.on.malformed.documents": "fail",
20    "behavior.on.null.values": "ignore",
21    "behavior.on.version.conflict": "ignore",
22
23    "errors.deadletterqueue.topic.name": "ad-tech-dl",
24    "errors.tolerance": "all",
25    "errors.deadletterqueue.context.headers.enable": true,
26    "errors.deadletterqueue.topic.replication.factor": 1,
27
28    "transforms": "insertTS,formatTS",
29    "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
30    "transforms.insertTS.timestamp.field": "created_at",
31    "transforms.formatTS.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
32    "transforms.formatTS.format": "yyyy-MM-dd HH:mm:ss",
33    "transforms.formatTS.field": "created_at",
34    "transforms.formatTS.target.type": "string"
35  }
36}

The connector is configured to write messages from the impressions and clicks topics into the OpenSearch indexes created earlier. It uses the same key and value converters to the source connector, and schemas are not enabled for both the key and value.

The cluster request URLs are added to the connection URL attribute (connection.url), and the default username and password are specified accordingly. These values are necessary to make HTTP requests to the cluster as shown earlier when we created indexes with explicit schema mapping. Also, as the topics are append-only logs, we can set the document ID to be [topic-name].[partition].[offset] by setting key.ignore to true. See the connector configuration document for more details.

Having an event timestamp attribute can be useful for performing temporal analysis. As I don’t find a comprehensive way to set it up in the source connector, a new field called created_at is added using single message transforms (SMTs). Specifically I added two transforms - insertTS and formatTS. As the name suggests, the former inserts the system timestamp value while it is formatted into yyyy-MM-dd HH:mm:ss by the latter.

Once created successfully, we can check the connector status as following.

 1$ curl http://localhost:8083/connectors/ad-tech-sink/status | json_pp
 2{
 3   "connector" : {
 4      "state" : "RUNNING",
 5      "worker_id" : "172.20.0.8:8083"
 6   },
 7   "name" : "ad-tech-sink",
 8   "tasks" : [
 9      {
10         "id" : 0,
11         "state" : "RUNNING",
12         "worker_id" : "172.20.0.8:8083"
13      },
14      {
15         "id" : 1,
16         "state" : "RUNNING",
17         "worker_id" : "172.20.0.8:8083"
18      }
19   ],
20   "type" : "sink"
21}

OpenSearch Destination

In OpenSearch Dashboards, we can search clicks that are associated with impressions. As expected, only a small portion of clicks are searched.

Moreover, we can join correlated impressions and clicks quickly using the Query Workbench. Below shows a simple SQL query that joins impressions and associating clicks that are created after a certain time point.

Summary

Kafka Connect can be effective to ingesting data from Apache Kafka into OpenSearch. In this post, we discussed how to develop a data pipeline from Apache Kafka into OpenSearch locally using Docker. Fake impressions and clicks data was pushed into Kafka topics using a Kafka source connector and those records will be ingested into OpenSearch indexes using a sink connector for near-real time analytics. In this next post, the pipeline will be deployed on AWS using Amazon OpenSearch Service, Amazon MSK and Amazon MSK Connect.