EMR on EKS provides a deployment option for Amazon EMR that allows you to automate the provisioning and management of open-source big data frameworks on Amazon EKS. While a wide range of open source big data components are available in EMR on EC2, only Apache Spark is available in EMR on EKS. It is more flexible, however, that applications of different EMR versions can be run in multiple availability zones on either EC2 or Fargate. Also, other types of containerized applications can be deployed on the same EKS cluster. Therefore, if you have or plan to have, for example, Apache Airflow, Apache Superset or Kubeflow as your analytics toolkits, it can be an effective way to manage big data (as well as non-big data) workloads. While Glue is more for ETL, EMR on EKS can also be used for other types of tasks such as machine learning. Moreover, it allows you to build a Spark application, not a Gluish Spark application. For example, while you have to use custom connectors for Hudi or Iceberg for Glue, you can use their native libraries with EMR on EKS. In this post, we’ll discuss EMR on EKS with simple and elaborated examples.

Cluster setup and configuration

We’ll use command line utilities heavily. The following tools are required.

  • AWS CLI V2 - it is the official command line interface that enables users to interact with AWS services.
  • eksctl - it is a CLI tool for creating and managing clusters on EKS.
  • kubectl - it is a command line utility for communicating with the cluster API server.

Upload preliminary resources to S3

We need supporting files, and they are created/downloaded into the config and manifests folders using a setup script - the script can be found in the project GitHub repository. The generated files will be illustrated below.

 1export OWNER=jaehyeon
 2export AWS_REGION=ap-southeast-2
 3export CLUSTER_NAME=emr-eks-example
 4export EMR_ROLE_NAME=${CLUSTER_NAME}-job-execution
 8## run setup script
 9# - create config files, sample scripts and download necessary files
12tree -p config -p manifests
13# config
14# ├── [-rw-r--r--]  cdc_events.avsc
15# ├── [-rw-r--r--]  cdc_events_s3.properties
16# ├── [-rw-r--r--]  driver_pod_template.yml
17# ├── [-rw-r--r--]  executor_pod_template.yml
18# ├── [-rw-r--r--]  food_establishment_data.csv
19# ├── [-rw-r--r--]  health_violations.py
20# └── [-rw-r--r--]  hudi-utilities-bundle_2.12-0.10.0.jar
21# manifests
22# ├── [-rw-r--r--]  cluster.yaml
23# ├── [-rw-r--r--]  nodegroup-spot.yaml
24# └── [-rw-r--r--]  nodegroup.yaml

We’ll configure logging on S3 and CloudWatch so that a S3 bucket and CloudWatch log group are created. Also a Glue database is created as I encountered an error to create a Glue table when the database doesn’t exist. Finally the files in the config folder are uploaded to S3.

 1#### create S3 bucket/log group/glue database and upload files to S3
 2aws s3 mb s3://${S3_BUCKET_NAME}
 3aws logs create-log-group --log-group-name=${LOG_GROUP_NAME}
 4aws glue create-database --database-input '{"Name": "datalake"}'
 6## upload files to S3
 7for f in $(ls ./config/)
 8  do
 9    aws s3 cp ./config/${f} s3://${S3_BUCKET_NAME}/config/
10  done
11# upload: config/cdc_events.avsc to s3://emr-eks-example-ap-southeast-2/config/cdc_events.avsc
12# upload: config/cdc_events_s3.properties to s3://emr-eks-example-ap-southeast-2/config/cdc_events_s3.properties
13# upload: config/driver_pod_template.yml to s3://emr-eks-example-ap-southeast-2/config/driver_pod_template.yml
14# upload: config/executor_pod_template.yml to s3://emr-eks-example-ap-southeast-2/config/executor_pod_template.yml
15# upload: config/food_establishment_data.csv to s3://emr-eks-example-ap-southeast-2/config/food_establishment_data.csv
16# upload: config/health_violations.py to s3://emr-eks-example-ap-southeast-2/config/health_violations.py
17# upload: config/hudi-utilities-bundle_2.12-0.10.0.jar to s3://emr-eks-example-ap-southeast-2/config/hudi-utilities-bundle_2.12-0.10.0.jar

Create EKS cluster and node group

We can use either command line options or a config file when creating a cluster or node group using eksctl. We’ll use config files and below shows the corresponding config files.

 1# ./config/cluster.yaml
 3apiVersion: eksctl.io/v1alpha5
 4kind: ClusterConfig
 7  name: emr-eks-example
 8  region: ap-southeast-2
 9  tags:
10    Owner: jaehyeon
11# ./config/nodegroup.yaml
13apiVersion: eksctl.io/v1alpha5
14kind: ClusterConfig
17  name: emr-eks-example
18  region: ap-southeast-2
19  tags:
20    Owner: jaehyeon
23- name: nodegroup
24  desiredCapacity: 2
25  instanceType: m5.xlarge

_eksctl _creates a cluster or node group via CloudFormation. Each command will create a dedicated CloudFormation stack and it’ll take about 15 minutes. Also it generates the default kubeconfig file in the $HOME/.kube folder. Once the node group is created, we can check it using the kubectl command.

1#### create cluster, node group and configure
2eksctl create cluster -f ./manifests/cluster.yaml
3eksctl create nodegroup -f ./manifests/nodegroup.yaml
5kubectl get nodes
6# NAME                                               STATUS   ROLES    AGE     VERSION
7# ip-192-168-33-60.ap-southeast-2.compute.internal   Ready    <none>   5m52s   v1.21.5-eks-bc4871b
8# ip-192-168-95-68.ap-southeast-2.compute.internal   Ready    <none>   5m49s   v1.21.5-eks-bc4871b

Set up Amazon EMR on EKS

As described in the Amazon EMR on EKS development guide, Amazon EKS uses Kubernetes namespaces to divide cluster resources between multiple users and applications. A virtual cluster is a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. The following steps are taken in order to set up for EMR on EKS.

Enable cluster access for Amazon EMR on EKS

After creating a Kubernetes namespace for EMR (spark), it is necessary to allow Amazon EMR on EKS to access the namespace. It can be automated by eksctl and specifically the following actions are performed.

1kubectl create namespace spark
2eksctl create iamidentitymapping --cluster ${CLUSTER_NAME} \
3  --namespace spark --service-name "emr-containers"

While the details of the role and role binding can be found in the development guide, we can see that the aws-auth ConfigMap is updated with the new Kubernetes user.

 1kubectl describe cm aws-auth -n kube-system
 2# Name:         aws-auth
 3# Namespace:    kube-system
 4# Labels:       <none>
 5# Annotations:  <none>
 7# Data
 8# ====
 9# mapRoles:
10# ----
11# - groups:
12#   - system:bootstrappers
13#   - system:nodes
14#   rolearn: arn:aws:iam::<AWS-ACCOUNT-ID>:role/eksctl-emr-eks-example-nodegroup-NodeInstanceRole-15J26FPOYH0AL
15#   username: system:node:{{EC2PrivateDNSName}}
16# - rolearn: arn:aws:iam::<AWS-ACCOUNT-ID>:role/AWSServiceRoleForAmazonEMRContainers
17#   username: emr-containers
19# mapUsers:
20# ----
21# []
23# Events:  <none>

Create an IAM OIDC identity provider for the EKS cluster

We can associate an IAM role with a Kubernetes service account. This service account can then provide AWS permissions to the containers in any pod that uses that service account. Simply put, the service account for EMR will be allowed to assume the EMR job execution role by OIDC federation - see EKS user guide for details. The job execution role will be created below. In order for the OIDC federation to work, we need to set up an IAM OIDC provider for the EKS cluster.

1eksctl utils associate-iam-oidc-provider \
2  --cluster ${CLUSTER_NAME} --approve
4aws iam list-open-id-connect-providers --query "OpenIDConnectProviderList[1]"
5# {
6#     "Arn": "arn:aws:iam::<AWS-ACCOUNT-ID>:oidc-provider/oidc.eks.ap-southeast-2.amazonaws.com/id/6F3C18F00D8610088272FEF11013B8C5"
7# }

Create a job execution role

The following job execution role is created for the examples of this post. The permissions are set up to perform tasks on S3 and Glue. We’ll also enable logging on S3 and CloudWatch so that the necessary permissions are added as well.

 1aws iam create-role \
 2  --role-name ${EMR_ROLE_NAME} \
 3  --assume-role-policy-document '{
 4  "Version": "2012-10-17",
 5  "Statement": [
 6    {
 7      "Effect": "Allow",
 8      "Principal": {
 9        "Service": "elasticmapreduce.amazonaws.com"
10      },
11      "Action": "sts:AssumeRole"
12    }
13  ]
16aws iam put-role-policy \
17  --role-name ${EMR_ROLE_NAME} \
18  --policy-name ${EMR_ROLE_NAME}-policy \
19  --policy-document '{
20    "Version": "2012-10-17",
21    "Statement": [
22        {
23            "Effect": "Allow",
24            "Action": [
25                "s3:PutObject",
26                "s3:GetObject",
27                "s3:ListBucket",
28                "s3:DeleteObject"
29            ],
30            "Resource": "*"
31        },
32        {
33            "Effect": "Allow",
34            "Action": [
35                "glue:*"
36            ],
37            "Resource": "*"
38        },
39        {
40            "Effect": "Allow",
41            "Action": [
42                "logs:PutLogEvents",
43                "logs:CreateLogStream",
44                "logs:DescribeLogGroups",
45                "logs:DescribeLogStreams"
46            ],
47            "Resource": [
48                "arn:aws:logs:*:*:*"
49            ]
50        }
51    ]

Update the trust policy of the job execution role

As mentioned earlier, the EMR service account is allowed to assume the job execution role by OIDC federation. In order to enable it, we need to update the trust relationship of the role. We can update it as shown below.

 1aws emr-containers update-role-trust-policy \
 2  --cluster-name ${CLUSTER_NAME} \
 3  --namespace spark \
 4  --role-name ${EMR_ROLE_NAME}
 6aws iam get-role --role-name ${EMR_ROLE_NAME} --query "Role.AssumeRolePolicyDocument.Statement[1]"
 7# {
 8#     "Effect": "Allow",
 9#     "Principal": {
10#         "Federated": "arn:aws:iam::<AWS-ACCOUNT-ID>:oidc-provider/oidc.eks.ap-southeast-2.amazonaws.com/id/6F3C18F00D8610088272FEF11013B8C5"
11#     },
12#     "Action": "sts:AssumeRoleWithWebIdentity",
13#     "Condition": {
14#         "StringLike": {
15#             "oidc.eks.ap-southeast-2.amazonaws.com/id/6F3C18F00D8610088272FEF11013B8C5:sub": "system:serviceaccount:spark:emr-containers-sa-*-*-<AWS-ACCOUNT-ID>-93ztm12b8wi73z7zlhtudeipd0vpa8b60gchkls78cj1q"
16#         }
17#     }
18# }

Register Amazon EKS Cluster with Amazon EMR

We can register the Amazon EKS cluster with Amazon EMR as shown below. We need to provide the EKS cluster name and namespace.

 1## register EKS cluster with EMR
 2aws emr-containers create-virtual-cluster \
 3  --name ${CLUSTER_NAME} \
 4  --container-provider '{
 5    "id": "'${CLUSTER_NAME}'",
 6    "type": "EKS",
 7    "info": {
 8        "eksInfo": {
 9            "namespace": "spark"
10        }
11    }
14aws emr-containers list-virtual-clusters --query "sort_by(virtualClusters, &createdAt)[-1]"
15# {
16#     "id": "9wvd1yhms5tk1k8chrn525z34",
17#     "name": "emr-eks-example",
18#     "arn": "arn:aws:emr-containers:ap-southeast-2:<AWS-ACCOUNT-ID>:/virtualclusters/9wvd1yhms5tk1k8chrn525z34",
19#     "state": "RUNNING",
20#     "containerProvider": {
21#         "type": "EKS",
22#         "id": "emr-eks-example",
23#         "info": {
24#             "eksInfo": {
25#                 "namespace": "spark"
26#             }
27#         }
28#     },
29#     "createdAt": "2022-01-07T01:26:37+00:00",
30#     "tags": {}
31# }

We can also check the virtual cluster on the EMR console.


Food Establishment Inspection

This example is from the getting started tutorial of the Amazon EMR management guide. The PySpark script executes a simple SQL statement that counts the top 10 restaurants with the most Red violations and saves the output to S3. The script and its data source are saved to S3.

In the job request, we specify the job name, virtual cluster ID and job execution role. Also, the spark submit details are specified in the job driver option where the entrypoint is set to the S3 location of the PySpark script, entry point arguments and spark submit parameters. Finally, S3 and CloudWatch monitoring configuration is specified.

 1export VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --query "sort_by(virtualClusters, &createdAt)[-1].id" --output text)
 2export EMR_ROLE_ARN=$(aws iam get-role --role-name ${EMR_ROLE_NAME} --query Role.Arn --output text)
 4## create job request
 5cat << EOF > ./request-health-violations.json
 7    "name": "health-violations",
 8    "virtualClusterId": "${VIRTUAL_CLUSTER_ID}",
 9    "executionRoleArn": "${EMR_ROLE_ARN}",
10    "releaseLabel": "emr-6.2.0-latest",
11    "jobDriver": {
12        "sparkSubmitJobDriver": {
13            "entryPoint": "s3://${S3_BUCKET_NAME}/config/health_violations.py",
14            "entryPointArguments": [
15                "--data_source", "s3://${S3_BUCKET_NAME}/config/food_establishment_data.csv",
16                "--output_uri", "s3://${S3_BUCKET_NAME}/output"
17            ],
18            "sparkSubmitParameters": "--conf spark.executor.instances=2 \
19                --conf spark.executor.memory=2G \
20                --conf spark.executor.cores=1 \
21                --conf spark.driver.cores=1 \
22                --conf spark.driver.memory=2G"
23        }
24    },
25    "configurationOverrides": {
26        "monitoringConfiguration": {
27            "cloudWatchMonitoringConfiguration": {
28                "logGroupName": "${LOG_GROUP_NAME}",
29                "logStreamNamePrefix": "health"
30            },
31            "s3MonitoringConfiguration": {
32                "logUri": "s3://${S3_BUCKET_NAME}/logs/"
33            }
34        }
35    }
39aws emr-containers start-job-run \
40    --cli-input-json file://./request-health-violations.json

Once a job run is started, it can be checked under the virtual cluster section of the EMR console.

When we click the View logs link, it launches the Spark History Server on a new tab.

As configured, the container logs of the job can be found in CloudWatch.

Also, the logs for the containers (spark driver and executor) and control-logs (job runner) can be found in S3.

Once the job completes, we can check the output from S3 as shown below.

 1export OUTPUT_FILE=$(aws s3 ls s3://${S3_BUCKET_NAME}/output/ | grep .csv | awk '{print $4}')
 2aws s3 cp s3://${S3_BUCKET_NAME}/output/${OUTPUT_FILE} - | head -n 15
 3# name,total_red_violations
 4# SUBWAY,322
 8# TACO TIME,240
 9# MCDONALD'S,177
11# SAFEWAY INC #1508,143

Hudi DeltaStreamer

In an earlier post, we discussed a Hudi table generation using the DeltaStreamer utility as part of a CDC-based data ingestion solution. In that exercise, we executed the spark job in an EMR cluster backed by EC2 instances. We can run the spark job in our EKS cluster.

We can configure to run the executors in spot instances in order to save cost. A spot node group can be created by the following configuration file.

 1# ./manifests/nodegroup-spot.yaml
 3apiVersion: eksctl.io/v1alpha5
 4kind: ClusterConfig
 7  name: emr-eks-example
 8  region: ap-southeast-2
 9  tags:
10    Owner: jaehyeon
13- name: nodegroup-spot
14  desiredCapacity: 3
15  instanceTypes:
16  - m5.xlarge
17  - m5a.xlarge
18  - m4.xlarge
19  spot: true

Once the spot node group is created, we can see 3 instances are added to the EKS node with the SPOT capacity type.

 1eksctl create nodegroup -f ./manifests/nodegroup-spot.yaml
 3kubectl get nodes \
 4  --label-columns=eks.amazonaws.com/nodegroup,eks.amazonaws.com/capacityType \
 5  --sort-by=.metadata.creationTimestamp
 6# NAME                                                STATUS   ROLES    AGE    VERSION               NODEGROUP        CAPACITYTYPE
 7# ip-192-168-33-60.ap-southeast-2.compute.internal    Ready    <none>   52m    v1.21.5-eks-bc4871b   nodegroup        ON_DEMAND
 8# ip-192-168-95-68.ap-southeast-2.compute.internal    Ready    <none>   51m    v1.21.5-eks-bc4871b   nodegroup        ON_DEMAND
 9# ip-192-168-79-20.ap-southeast-2.compute.internal    Ready    <none>   114s   v1.21.5-eks-bc4871b   nodegroup-spot   SPOT
10# ip-192-168-1-57.ap-southeast-2.compute.internal     Ready    <none>   112s   v1.21.5-eks-bc4871b   nodegroup-spot   SPOT
11# ip-192-168-34-249.ap-southeast-2.compute.internal   Ready    <none>   97s    v1.21.5-eks-bc4871b   nodegroup-spot   SPOT

The driver and executor pods should be created in different nodes, and it can be controlled by Pod Template. Below the driver and executor have a different node selector, and they’ll be assigned based on the capacity type label specified in the node selector.

 1# ./config/driver_pod_template.yml
 2apiVersion: v1
 3kind: Pod
 5  nodeSelector:
 6    eks.amazonaws.com/capacityType: ON_DEMAND
 8# ./config/executor_pod_template.yml
 9apiVersion: v1
10kind: Pod
12  nodeSelector:
13    eks.amazonaws.com/capacityType: SPOT

The job request for the DeltaStreamer job can be found below. Note that, in the entrypoint, we specified the latest Hudi utilities bundle (0.10.0) from S3 instead of the pre-installed Hudi 0.8.0. It is because Hudi 0.8.0 supports JDBC based Hive sync only while Hudi 0.9.0+ supports multiple Hive sync modes including Hive metastore. EMR on EKS doesn’t run _HiveServer2 _so that JDBC based Hive sync doesn’t work. Instead, we can specify Hive sync based on Hive metastore because Glue data catalog can be used as Hive metastore. Therefore, we need a newer version of the Hudi library in order to register the resulting Hudi table to Glue data catalog. Also, in the application configuration, we configured to use Glue data catalog as the Hive metastore and the driver/executor pod template files are specified.

 1export VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --query "sort_by(virtualClusters, &createdAt)[-1].id" --output text)
 2export EMR_ROLE_ARN=$(aws iam get-role --role-name ${EMR_ROLE_NAME} --query Role.Arn --output text)
 4## create job request
 5cat << EOF > ./request-cdc-events.json
 7    "name": "cdc-events",
 8    "virtualClusterId": "${VIRTUAL_CLUSTER_ID}",
 9    "executionRoleArn": "${EMR_ROLE_ARN}",
10    "releaseLabel": "emr-6.4.0-latest",
11    "jobDriver": {
12        "sparkSubmitJobDriver": {
13            "entryPoint": "s3://${S3_BUCKET_NAME}/config/hudi-utilities-bundle_2.12-0.10.0.jar",
14            "entryPointArguments": [
15              "--table-type", "COPY_ON_WRITE",
16              "--source-ordering-field", "__source_ts_ms",
17              "--props", "s3://${S3_BUCKET_NAME}/config/cdc_events_s3.properties",
18              "--source-class", "org.apache.hudi.utilities.sources.JsonDFSSource",
19              "--target-base-path", "s3://${S3_BUCKET_NAME}/hudi/cdc-events/",
20              "--target-table", "datalake.cdc_events",
21              "--schemaprovider-class", "org.apache.hudi.utilities.schema.FilebasedSchemaProvider",
22              "--enable-hive-sync",
23              "--min-sync-interval-seconds", "60",
24              "--continuous",
25              "--op", "UPSERT"
26            ],
27            "sparkSubmitParameters": "--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
28            --jars local:///usr/lib/spark/external/lib/spark-avro_2.12-3.1.2-amzn-0.jar,s3://${S3_BUCKET_NAME}/config/hudi-utilities-bundle_2.12-0.10.0.jar \
29            --conf spark.driver.cores=1 \
30            --conf spark.driver.memory=2G \
31            --conf spark.executor.instances=2 \
32            --conf spark.executor.memory=2G \
33            --conf spark.executor.cores=1 \
34            --conf spark.sql.catalogImplementation=hive \
35            --conf spark.serializer=org.apache.spark.serializer.KryoSerializer"
36        }
37    },
38    "configurationOverrides": {
39        "applicationConfiguration": [
40            {
41                "classification": "spark-defaults",
42                "properties": {
43                  "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
44                  "spark.kubernetes.driver.podTemplateFile":"s3://${S3_BUCKET_NAME}/config/driver_pod_template.yml",
45                  "spark.kubernetes.executor.podTemplateFile":"s3://${S3_BUCKET_NAME}/config/executor_pod_template.yml"
46                }
47            }
48        ],
49        "monitoringConfiguration": {
50            "cloudWatchMonitoringConfiguration": {
51                "logGroupName": "${LOG_GROUP_NAME}",
52                "logStreamNamePrefix": "cdc"
53            },
54            "s3MonitoringConfiguration": {
55                "logUri": "s3://${S3_BUCKET_NAME}/logs/"
56            }
57        }
58    }
62aws emr-containers start-job-run \
63    --cli-input-json file://./request-cdc-events.json

Once the job run is started, we can check it as shown below.

 1aws emr-containers list-job-runs --virtual-cluster-id ${VIRTUAL_CLUSTER_ID} --query "jobRuns[?name=='cdc-events']"
 2# [
 3#     {
 4#         "id": "00000002vhi9hivmjk5",
 5#         "name": "cdc-events",
 6#         "virtualClusterId": "9wvd1yhms5tk1k8chrn525z34",
 7#         "arn": "arn:aws:emr-containers:ap-southeast-2:<AWS-ACCOUNT-ID>:/virtualclusters/9wvd1yhms5tk1k8chrn525z34/jobruns/00000002vhi9hivmjk5",
 8#         "state": "RUNNING",
 9#         "clientToken": "63a707e4-e5bc-43e4-b11a-5dcfb4377fd3",
10#         "executionRoleArn": "arn:aws:iam::<AWS-ACCOUNT-ID>:role/emr-eks-example-job-execution",
11#         "releaseLabel": "emr-6.4.0-latest",
12#         "createdAt": "2022-01-07T02:09:34+00:00",
13#         "createdBy": "arn:aws:sts::<AWS-ACCOUNT-ID>:assumed-role/AWSReservedSSO_AWSFullAccountAdmin_fb6fa00561d5e1c2/jaehyeon.kim@cevo.com.au",
14#         "tags": {}
15#     }
16# ]

With kubectl, we can check there are 1 driver, 2 executors and 1 job runner pods.

1kubectl get pod -n spark
2# NAME                                                            READY   STATUS    RESTARTS   AGE
3# pod/00000002vhi9hivmjk5-wf8vp                                   3/3     Running   0          14m
4# pod/delta-streamer-datalake-cdcevents-5397917e324dea27-exec-1   2/2     Running   0          12m
5# pod/delta-streamer-datalake-cdcevents-5397917e324dea27-exec-2   2/2     Running   0          12m
6# pod/spark-00000002vhi9hivmjk5-driver                            2/2     Running   0          13m

Also, we can see the driver pod runs in the on-demand node group while the executor and job runner pods run in the spot node group.

 1## driver runs in the on demand node
 2for n in $(kubectl get nodes -l eks.amazonaws.com/capacityType=ON_DEMAND --no-headers | cut -d " " -f1)
 3  do echo "Pods on instance ${n}:";kubectl get pods -n spark  --no-headers --field-selector spec.nodeName=${n}
 4     echo
 5  done
 6# Pods on instance ip-192-168-33-60.ap-southeast-2.compute.internal:
 7# No resources found in spark namespace.
 9# Pods on instance ip-192-168-95-68.ap-southeast-2.compute.internal:
10# spark-00000002vhi9hivmjk5-driver   2/2   Running   0     17m
12## executor and job runner run in the spot node
13for n in $(kubectl get nodes -l eks.amazonaws.com/capacityType=SPOT --no-headers | cut -d " " -f1)
14  do echo "Pods on instance ${n}:";kubectl get pods -n spark  --no-headers --field-selector spec.nodeName=${n}
15     echo
16  done
17# Pods on instance ip-192-168-1-57.ap-southeast-2.compute.internal:
18# delta-streamer-datalake-cdcevents-5397917e324dea27-exec-2   2/2   Running   0     16m
20# Pods on instance ip-192-168-34-249.ap-southeast-2.compute.internal:
21# 00000002vhi9hivmjk5-wf8vp   3/3   Running   0     18m
23# Pods on instance ip-192-168-79-20.ap-southeast-2.compute.internal:
24# delta-streamer-datalake-cdcevents-5397917e324dea27-exec-1   2/2   Running   0     16m

The Hudi utility will register a table in the Glue data catalog and it can be checked as shown below.

1aws glue get-table --database-name datalake --name cdc_events \
2    --query "Table.[DatabaseName, Name, StorageDescriptor.Location, CreateTime, CreatedBy]"
3# [
4#     "datalake",
5#     "cdc_events",
6#     "s3://emr-eks-example-ap-southeast-2/hudi/cdc-events",
7#     "2022-01-07T13:18:49+11:00",
8#     "arn:aws:sts::590312749310:assumed-role/emr-eks-example-job-execution/aws-sdk-java-1641521928075"
9# ]

Finally, the details of the table can be queried in Athena.

Clean up

The resources that are created for this post can be deleted using aws cli and eksctl as shown below.

 1## delete virtual cluster
 2export JOB_RUN_ID=$(aws emr-containers list-job-runs --virtual-cluster-id ${VIRTUAL_CLUSTER_ID} --query "jobRuns[?name=='cdc-events'].id" --output text)
 3aws emr-containers cancel-job-run --id ${JOB_RUN_ID} \
 4  --virtual-cluster-id ${VIRTUAL_CLUSTER_ID}
 5aws emr-containers delete-virtual-cluster --id ${VIRTUAL_CLUSTER_ID}
 6## delete s3
 7aws s3 rm s3://${S3_BUCKET_NAME} --recursive
 8aws s3 rb s3://${S3_BUCKET_NAME} --force
 9## delete log group
10aws logs delete-log-group --log-group-name ${LOG_GROUP_NAME}
11## delete glue table/database
12aws glue delete-table --database-name datalake --name cdc_events
13aws glue delete-database --name datalake
14## delete iam role/policy
15aws iam delete-role-policy --role-name ${EMR_ROLE_NAME} --policy-name ${EMR_ROLE_NAME}-policy
16aws iam delete-role --role-name ${EMR_ROLE_NAME}
17## delete eks cluster
18eksctl delete cluster --name ${CLUSTER_NAME}


In this post, we discussed how to run spark jobs on EKS. First we created an EKS cluster and a node group using eksctl. Then we set up EMR on EKS. A simple PySpark job that shows the basics of EMR on EKS is illustrated and a more realistic example of running Hudi DeltaStreamer utility is demonstrated where the driver and executors are assigned in different node groups.