EMR on EKS provides a deployment option for Amazon EMR that allows you to automate the provisioning and management of open-source big data frameworks on Amazon EKS. While a wide range of open source big data components are available in EMR on EC2, only Apache Spark is available in EMR on EKS. It is more flexible, however, that applications of different EMR versions can be run in multiple availability zones on either EC2 or Fargate. Also, other types of containerized applications can be deployed on the same EKS cluster. Therefore, if you have or plan to have, for example, Apache Airflow, Apache Superset or Kubeflow as your analytics toolkits, it can be an effective way to manage big data (as well as non-big data) workloads. While Glue is more for ETL, EMR on EKS can also be used for other types of tasks such as machine learning. Moreover, it allows you to build a Spark application, not a Gluish Spark application. For example, while you have to use custom connectors for Hudi or Iceberg for Glue, you can use their native libraries with EMR on EKS. In this post, we’ll discuss EMR on EKS with simple and elaborated examples.
Cluster setup and configuration
We’ll use command line utilities heavily. The following tools are required.
- AWS CLI V2 - it is the official command line interface that enables users to interact with AWS services.
- eksctl - it is a CLI tool for creating and managing clusters on EKS.
- kubectl - it is a command line utility for communicating with the cluster API server.
Upload preliminary resources to S3
We need supporting files, and they are created/downloaded into the config and manifests folders using a setup script - the script can be found in the project GitHub repository. The generated files will be illustrated below.
1export OWNER=jaehyeon
2export AWS_REGION=ap-southeast-2
3export CLUSTER_NAME=emr-eks-example
4export EMR_ROLE_NAME=${CLUSTER_NAME}-job-execution
5export S3_BUCKET_NAME=${CLUSTER_NAME}-${AWS_REGION}
6export LOG_GROUP_NAME=/${CLUSTER_NAME}
7
8## run setup script
9# - create config files, sample scripts and download necessary files
10./setup.sh
11
12tree -p config -p manifests
13# config
14# ├── [-rw-r--r--] cdc_events.avsc
15# ├── [-rw-r--r--] cdc_events_s3.properties
16# ├── [-rw-r--r--] driver_pod_template.yml
17# ├── [-rw-r--r--] executor_pod_template.yml
18# ├── [-rw-r--r--] food_establishment_data.csv
19# ├── [-rw-r--r--] health_violations.py
20# └── [-rw-r--r--] hudi-utilities-bundle_2.12-0.10.0.jar
21# manifests
22# ├── [-rw-r--r--] cluster.yaml
23# ├── [-rw-r--r--] nodegroup-spot.yaml
24# └── [-rw-r--r--] nodegroup.yaml
We’ll configure logging on S3 and CloudWatch so that a S3 bucket and CloudWatch log group are created. Also a Glue database is created as I encountered an error to create a Glue table when the database doesn’t exist. Finally the files in the config folder are uploaded to S3.
1#### create S3 bucket/log group/glue database and upload files to S3
2aws s3 mb s3://${S3_BUCKET_NAME}
3aws logs create-log-group --log-group-name=${LOG_GROUP_NAME}
4aws glue create-database --database-input '{"Name": "datalake"}'
5
6## upload files to S3
7for f in $(ls ./config/)
8 do
9 aws s3 cp ./config/${f} s3://${S3_BUCKET_NAME}/config/
10 done
11# upload: config/cdc_events.avsc to s3://emr-eks-example-ap-southeast-2/config/cdc_events.avsc
12# upload: config/cdc_events_s3.properties to s3://emr-eks-example-ap-southeast-2/config/cdc_events_s3.properties
13# upload: config/driver_pod_template.yml to s3://emr-eks-example-ap-southeast-2/config/driver_pod_template.yml
14# upload: config/executor_pod_template.yml to s3://emr-eks-example-ap-southeast-2/config/executor_pod_template.yml
15# upload: config/food_establishment_data.csv to s3://emr-eks-example-ap-southeast-2/config/food_establishment_data.csv
16# upload: config/health_violations.py to s3://emr-eks-example-ap-southeast-2/config/health_violations.py
17# upload: config/hudi-utilities-bundle_2.12-0.10.0.jar to s3://emr-eks-example-ap-southeast-2/config/hudi-utilities-bundle_2.12-0.10.0.jar
Create EKS cluster and node group
We can use either command line options or a config file when creating a cluster or node group using eksctl. We’ll use config files and below shows the corresponding config files.
1# ./config/cluster.yaml
2---
3apiVersion: eksctl.io/v1alpha5
4kind: ClusterConfig
5
6metadata:
7 name: emr-eks-example
8 region: ap-southeast-2
9 tags:
10 Owner: jaehyeon
11# ./config/nodegroup.yaml
12---
13apiVersion: eksctl.io/v1alpha5
14kind: ClusterConfig
15
16metadata:
17 name: emr-eks-example
18 region: ap-southeast-2
19 tags:
20 Owner: jaehyeon
21
22managedNodeGroups:
23- name: nodegroup
24 desiredCapacity: 2
25 instanceType: m5.xlarge
_eksctl _creates a cluster or node group via CloudFormation. Each command will create a dedicated CloudFormation stack and it’ll take about 15 minutes. Also it generates the default kubeconfig file in the $HOME/.kube folder. Once the node group is created, we can check it using the kubectl command.
1#### create cluster, node group and configure
2eksctl create cluster -f ./manifests/cluster.yaml
3eksctl create nodegroup -f ./manifests/nodegroup.yaml
4
5kubectl get nodes
6# NAME STATUS ROLES AGE VERSION
7# ip-192-168-33-60.ap-southeast-2.compute.internal Ready <none> 5m52s v1.21.5-eks-bc4871b
8# ip-192-168-95-68.ap-southeast-2.compute.internal Ready <none> 5m49s v1.21.5-eks-bc4871b
Set up Amazon EMR on EKS
As described in the Amazon EMR on EKS development guide, Amazon EKS uses Kubernetes namespaces to divide cluster resources between multiple users and applications. A virtual cluster is a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. The following steps are taken in order to set up for EMR on EKS.
Enable cluster access for Amazon EMR on EKS
After creating a Kubernetes namespace for EMR (spark), it is necessary to allow Amazon EMR on EKS to access the namespace. It can be automated by eksctl and specifically the following actions are performed.
- setting up RBAC authorization by creating a Kubernetes role and binding the role to a Kubernetes user
- mapping the Kubernetes user to the EMR on EKS service-linked role
1kubectl create namespace spark
2eksctl create iamidentitymapping --cluster ${CLUSTER_NAME} \
3 --namespace spark --service-name "emr-containers"
While the details of the role and role binding can be found in the development guide, we can see that the aws-auth ConfigMap is updated with the new Kubernetes user.
1kubectl describe cm aws-auth -n kube-system
2# Name: aws-auth
3# Namespace: kube-system
4# Labels: <none>
5# Annotations: <none>
6
7# Data
8# ====
9# mapRoles:
10# ----
11# - groups:
12# - system:bootstrappers
13# - system:nodes
14# rolearn: arn:aws:iam::<AWS-ACCOUNT-ID>:role/eksctl-emr-eks-example-nodegroup-NodeInstanceRole-15J26FPOYH0AL
15# username: system:node:{{EC2PrivateDNSName}}
16# - rolearn: arn:aws:iam::<AWS-ACCOUNT-ID>:role/AWSServiceRoleForAmazonEMRContainers
17# username: emr-containers
18
19# mapUsers:
20# ----
21# []
22
23# Events: <none>
Create an IAM OIDC identity provider for the EKS cluster
We can associate an IAM role with a Kubernetes service account. This service account can then provide AWS permissions to the containers in any pod that uses that service account. Simply put, the service account for EMR will be allowed to assume the EMR job execution role by OIDC federation - see EKS user guide for details. The job execution role will be created below. In order for the OIDC federation to work, we need to set up an IAM OIDC provider for the EKS cluster.
1eksctl utils associate-iam-oidc-provider \
2 --cluster ${CLUSTER_NAME} --approve
3
4aws iam list-open-id-connect-providers --query "OpenIDConnectProviderList[1]"
5# {
6# "Arn": "arn:aws:iam::<AWS-ACCOUNT-ID>:oidc-provider/oidc.eks.ap-southeast-2.amazonaws.com/id/6F3C18F00D8610088272FEF11013B8C5"
7# }
Create a job execution role
The following job execution role is created for the examples of this post. The permissions are set up to perform tasks on S3 and Glue. We’ll also enable logging on S3 and CloudWatch so that the necessary permissions are added as well.
1aws iam create-role \
2 --role-name ${EMR_ROLE_NAME} \
3 --assume-role-policy-document '{
4 "Version": "2012-10-17",
5 "Statement": [
6 {
7 "Effect": "Allow",
8 "Principal": {
9 "Service": "elasticmapreduce.amazonaws.com"
10 },
11 "Action": "sts:AssumeRole"
12 }
13 ]
14}'
15
16aws iam put-role-policy \
17 --role-name ${EMR_ROLE_NAME} \
18 --policy-name ${EMR_ROLE_NAME}-policy \
19 --policy-document '{
20 "Version": "2012-10-17",
21 "Statement": [
22 {
23 "Effect": "Allow",
24 "Action": [
25 "s3:PutObject",
26 "s3:GetObject",
27 "s3:ListBucket",
28 "s3:DeleteObject"
29 ],
30 "Resource": "*"
31 },
32 {
33 "Effect": "Allow",
34 "Action": [
35 "glue:*"
36 ],
37 "Resource": "*"
38 },
39 {
40 "Effect": "Allow",
41 "Action": [
42 "logs:PutLogEvents",
43 "logs:CreateLogStream",
44 "logs:DescribeLogGroups",
45 "logs:DescribeLogStreams"
46 ],
47 "Resource": [
48 "arn:aws:logs:*:*:*"
49 ]
50 }
51 ]
52}'
Update the trust policy of the job execution role
As mentioned earlier, the EMR service account is allowed to assume the job execution role by OIDC federation. In order to enable it, we need to update the trust relationship of the role. We can update it as shown below.
1aws emr-containers update-role-trust-policy \
2 --cluster-name ${CLUSTER_NAME} \
3 --namespace spark \
4 --role-name ${EMR_ROLE_NAME}
5
6aws iam get-role --role-name ${EMR_ROLE_NAME} --query "Role.AssumeRolePolicyDocument.Statement[1]"
7# {
8# "Effect": "Allow",
9# "Principal": {
10# "Federated": "arn:aws:iam::<AWS-ACCOUNT-ID>:oidc-provider/oidc.eks.ap-southeast-2.amazonaws.com/id/6F3C18F00D8610088272FEF11013B8C5"
11# },
12# "Action": "sts:AssumeRoleWithWebIdentity",
13# "Condition": {
14# "StringLike": {
15# "oidc.eks.ap-southeast-2.amazonaws.com/id/6F3C18F00D8610088272FEF11013B8C5:sub": "system:serviceaccount:spark:emr-containers-sa-*-*-<AWS-ACCOUNT-ID>-93ztm12b8wi73z7zlhtudeipd0vpa8b60gchkls78cj1q"
16# }
17# }
18# }
Register Amazon EKS Cluster with Amazon EMR
We can register the Amazon EKS cluster with Amazon EMR as shown below. We need to provide the EKS cluster name and namespace.
1## register EKS cluster with EMR
2aws emr-containers create-virtual-cluster \
3 --name ${CLUSTER_NAME} \
4 --container-provider '{
5 "id": "'${CLUSTER_NAME}'",
6 "type": "EKS",
7 "info": {
8 "eksInfo": {
9 "namespace": "spark"
10 }
11 }
12}'
13
14aws emr-containers list-virtual-clusters --query "sort_by(virtualClusters, &createdAt)[-1]"
15# {
16# "id": "9wvd1yhms5tk1k8chrn525z34",
17# "name": "emr-eks-example",
18# "arn": "arn:aws:emr-containers:ap-southeast-2:<AWS-ACCOUNT-ID>:/virtualclusters/9wvd1yhms5tk1k8chrn525z34",
19# "state": "RUNNING",
20# "containerProvider": {
21# "type": "EKS",
22# "id": "emr-eks-example",
23# "info": {
24# "eksInfo": {
25# "namespace": "spark"
26# }
27# }
28# },
29# "createdAt": "2022-01-07T01:26:37+00:00",
30# "tags": {}
31# }
We can also check the virtual cluster on the EMR console.
Examples
Food Establishment Inspection
This example is from the getting started tutorial of the Amazon EMR management guide. The PySpark script executes a simple SQL statement that counts the top 10 restaurants with the most Red violations and saves the output to S3. The script and its data source are saved to S3.
In the job request, we specify the job name, virtual cluster ID and job execution role. Also, the spark submit details are specified in the job driver option where the entrypoint is set to the S3 location of the PySpark script, entry point arguments and spark submit parameters. Finally, S3 and CloudWatch monitoring configuration is specified.
1export VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --query "sort_by(virtualClusters, &createdAt)[-1].id" --output text)
2export EMR_ROLE_ARN=$(aws iam get-role --role-name ${EMR_ROLE_NAME} --query Role.Arn --output text)
3
4## create job request
5cat << EOF > ./request-health-violations.json
6{
7 "name": "health-violations",
8 "virtualClusterId": "${VIRTUAL_CLUSTER_ID}",
9 "executionRoleArn": "${EMR_ROLE_ARN}",
10 "releaseLabel": "emr-6.2.0-latest",
11 "jobDriver": {
12 "sparkSubmitJobDriver": {
13 "entryPoint": "s3://${S3_BUCKET_NAME}/config/health_violations.py",
14 "entryPointArguments": [
15 "--data_source", "s3://${S3_BUCKET_NAME}/config/food_establishment_data.csv",
16 "--output_uri", "s3://${S3_BUCKET_NAME}/output"
17 ],
18 "sparkSubmitParameters": "--conf spark.executor.instances=2 \
19 --conf spark.executor.memory=2G \
20 --conf spark.executor.cores=1 \
21 --conf spark.driver.cores=1 \
22 --conf spark.driver.memory=2G"
23 }
24 },
25 "configurationOverrides": {
26 "monitoringConfiguration": {
27 "cloudWatchMonitoringConfiguration": {
28 "logGroupName": "${LOG_GROUP_NAME}",
29 "logStreamNamePrefix": "health"
30 },
31 "s3MonitoringConfiguration": {
32 "logUri": "s3://${S3_BUCKET_NAME}/logs/"
33 }
34 }
35 }
36}
37EOF
38
39aws emr-containers start-job-run \
40 --cli-input-json file://./request-health-violations.json
Once a job run is started, it can be checked under the virtual cluster section of the EMR console.
When we click the View logs link, it launches the Spark History Server on a new tab.
As configured, the container logs of the job can be found in CloudWatch.
Also, the logs for the containers (spark driver and executor) and control-logs (job runner) can be found in S3.
Once the job completes, we can check the output from S3 as shown below.
1export OUTPUT_FILE=$(aws s3 ls s3://${S3_BUCKET_NAME}/output/ | grep .csv | awk '{print $4}')
2aws s3 cp s3://${S3_BUCKET_NAME}/output/${OUTPUT_FILE} - | head -n 15
3# name,total_red_violations
4# SUBWAY,322
5# T-MOBILE PARK,315
6# WHOLE FOODS MARKET,299
7# PCC COMMUNITY MARKETS,251
8# TACO TIME,240
9# MCDONALD'S,177
10# THAI GINGER,153
11# SAFEWAY INC #1508,143
12# TAQUERIA EL RINCONSITO,134
13# HIMITSU TERIYAKI,128
Hudi DeltaStreamer
In an earlier post, we discussed a Hudi table generation using the DeltaStreamer utility as part of a CDC-based data ingestion solution. In that exercise, we executed the spark job in an EMR cluster backed by EC2 instances. We can run the spark job in our EKS cluster.
We can configure to run the executors in spot instances in order to save cost. A spot node group can be created by the following configuration file.
1# ./manifests/nodegroup-spot.yaml
2---
3apiVersion: eksctl.io/v1alpha5
4kind: ClusterConfig
5
6metadata:
7 name: emr-eks-example
8 region: ap-southeast-2
9 tags:
10 Owner: jaehyeon
11
12managedNodeGroups:
13- name: nodegroup-spot
14 desiredCapacity: 3
15 instanceTypes:
16 - m5.xlarge
17 - m5a.xlarge
18 - m4.xlarge
19 spot: true
Once the spot node group is created, we can see 3 instances are added to the EKS node with the SPOT capacity type.
1eksctl create nodegroup -f ./manifests/nodegroup-spot.yaml
2
3kubectl get nodes \
4 --label-columns=eks.amazonaws.com/nodegroup,eks.amazonaws.com/capacityType \
5 --sort-by=.metadata.creationTimestamp
6# NAME STATUS ROLES AGE VERSION NODEGROUP CAPACITYTYPE
7# ip-192-168-33-60.ap-southeast-2.compute.internal Ready <none> 52m v1.21.5-eks-bc4871b nodegroup ON_DEMAND
8# ip-192-168-95-68.ap-southeast-2.compute.internal Ready <none> 51m v1.21.5-eks-bc4871b nodegroup ON_DEMAND
9# ip-192-168-79-20.ap-southeast-2.compute.internal Ready <none> 114s v1.21.5-eks-bc4871b nodegroup-spot SPOT
10# ip-192-168-1-57.ap-southeast-2.compute.internal Ready <none> 112s v1.21.5-eks-bc4871b nodegroup-spot SPOT
11# ip-192-168-34-249.ap-southeast-2.compute.internal Ready <none> 97s v1.21.5-eks-bc4871b nodegroup-spot SPOT
The driver and executor pods should be created in different nodes, and it can be controlled by Pod Template. Below the driver and executor have a different node selector, and they’ll be assigned based on the capacity type label specified in the node selector.
1# ./config/driver_pod_template.yml
2apiVersion: v1
3kind: Pod
4spec:
5 nodeSelector:
6 eks.amazonaws.com/capacityType: ON_DEMAND
7
8# ./config/executor_pod_template.yml
9apiVersion: v1
10kind: Pod
11spec:
12 nodeSelector:
13 eks.amazonaws.com/capacityType: SPOT
The job request for the DeltaStreamer job can be found below. Note that, in the entrypoint, we specified the latest Hudi utilities bundle (0.10.0) from S3 instead of the pre-installed Hudi 0.8.0. It is because Hudi 0.8.0 supports JDBC based Hive sync only while Hudi 0.9.0+ supports multiple Hive sync modes including Hive metastore. EMR on EKS doesn’t run _HiveServer2 _so that JDBC based Hive sync doesn’t work. Instead, we can specify Hive sync based on Hive metastore because Glue data catalog can be used as Hive metastore. Therefore, we need a newer version of the Hudi library in order to register the resulting Hudi table to Glue data catalog. Also, in the application configuration, we configured to use Glue data catalog as the Hive metastore and the driver/executor pod template files are specified.
1export VIRTUAL_CLUSTER_ID=$(aws emr-containers list-virtual-clusters --query "sort_by(virtualClusters, &createdAt)[-1].id" --output text)
2export EMR_ROLE_ARN=$(aws iam get-role --role-name ${EMR_ROLE_NAME} --query Role.Arn --output text)
3
4## create job request
5cat << EOF > ./request-cdc-events.json
6{
7 "name": "cdc-events",
8 "virtualClusterId": "${VIRTUAL_CLUSTER_ID}",
9 "executionRoleArn": "${EMR_ROLE_ARN}",
10 "releaseLabel": "emr-6.4.0-latest",
11 "jobDriver": {
12 "sparkSubmitJobDriver": {
13 "entryPoint": "s3://${S3_BUCKET_NAME}/config/hudi-utilities-bundle_2.12-0.10.0.jar",
14 "entryPointArguments": [
15 "--table-type", "COPY_ON_WRITE",
16 "--source-ordering-field", "__source_ts_ms",
17 "--props", "s3://${S3_BUCKET_NAME}/config/cdc_events_s3.properties",
18 "--source-class", "org.apache.hudi.utilities.sources.JsonDFSSource",
19 "--target-base-path", "s3://${S3_BUCKET_NAME}/hudi/cdc-events/",
20 "--target-table", "datalake.cdc_events",
21 "--schemaprovider-class", "org.apache.hudi.utilities.schema.FilebasedSchemaProvider",
22 "--enable-hive-sync",
23 "--min-sync-interval-seconds", "60",
24 "--continuous",
25 "--op", "UPSERT"
26 ],
27 "sparkSubmitParameters": "--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
28 --jars local:///usr/lib/spark/external/lib/spark-avro_2.12-3.1.2-amzn-0.jar,s3://${S3_BUCKET_NAME}/config/hudi-utilities-bundle_2.12-0.10.0.jar \
29 --conf spark.driver.cores=1 \
30 --conf spark.driver.memory=2G \
31 --conf spark.executor.instances=2 \
32 --conf spark.executor.memory=2G \
33 --conf spark.executor.cores=1 \
34 --conf spark.sql.catalogImplementation=hive \
35 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer"
36 }
37 },
38 "configurationOverrides": {
39 "applicationConfiguration": [
40 {
41 "classification": "spark-defaults",
42 "properties": {
43 "spark.hadoop.hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
44 "spark.kubernetes.driver.podTemplateFile":"s3://${S3_BUCKET_NAME}/config/driver_pod_template.yml",
45 "spark.kubernetes.executor.podTemplateFile":"s3://${S3_BUCKET_NAME}/config/executor_pod_template.yml"
46 }
47 }
48 ],
49 "monitoringConfiguration": {
50 "cloudWatchMonitoringConfiguration": {
51 "logGroupName": "${LOG_GROUP_NAME}",
52 "logStreamNamePrefix": "cdc"
53 },
54 "s3MonitoringConfiguration": {
55 "logUri": "s3://${S3_BUCKET_NAME}/logs/"
56 }
57 }
58 }
59}
60EOF
61
62aws emr-containers start-job-run \
63 --cli-input-json file://./request-cdc-events.json
Once the job run is started, we can check it as shown below.
1aws emr-containers list-job-runs --virtual-cluster-id ${VIRTUAL_CLUSTER_ID} --query "jobRuns[?name=='cdc-events']"
2# [
3# {
4# "id": "00000002vhi9hivmjk5",
5# "name": "cdc-events",
6# "virtualClusterId": "9wvd1yhms5tk1k8chrn525z34",
7# "arn": "arn:aws:emr-containers:ap-southeast-2:<AWS-ACCOUNT-ID>:/virtualclusters/9wvd1yhms5tk1k8chrn525z34/jobruns/00000002vhi9hivmjk5",
8# "state": "RUNNING",
9# "clientToken": "63a707e4-e5bc-43e4-b11a-5dcfb4377fd3",
10# "executionRoleArn": "arn:aws:iam::<AWS-ACCOUNT-ID>:role/emr-eks-example-job-execution",
11# "releaseLabel": "emr-6.4.0-latest",
12# "createdAt": "2022-01-07T02:09:34+00:00",
13# "createdBy": "arn:aws:sts::<AWS-ACCOUNT-ID>:assumed-role/AWSReservedSSO_AWSFullAccountAdmin_fb6fa00561d5e1c2/jaehyeon.kim@cevo.com.au",
14# "tags": {}
15# }
16# ]
With kubectl, we can check there are 1 driver, 2 executors and 1 job runner pods.
1kubectl get pod -n spark
2# NAME READY STATUS RESTARTS AGE
3# pod/00000002vhi9hivmjk5-wf8vp 3/3 Running 0 14m
4# pod/delta-streamer-datalake-cdcevents-5397917e324dea27-exec-1 2/2 Running 0 12m
5# pod/delta-streamer-datalake-cdcevents-5397917e324dea27-exec-2 2/2 Running 0 12m
6# pod/spark-00000002vhi9hivmjk5-driver 2/2 Running 0 13m
Also, we can see the driver pod runs in the on-demand node group while the executor and job runner pods run in the spot node group.
1## driver runs in the on demand node
2for n in $(kubectl get nodes -l eks.amazonaws.com/capacityType=ON_DEMAND --no-headers | cut -d " " -f1)
3 do echo "Pods on instance ${n}:";kubectl get pods -n spark --no-headers --field-selector spec.nodeName=${n}
4 echo
5 done
6# Pods on instance ip-192-168-33-60.ap-southeast-2.compute.internal:
7# No resources found in spark namespace.
8
9# Pods on instance ip-192-168-95-68.ap-southeast-2.compute.internal:
10# spark-00000002vhi9hivmjk5-driver 2/2 Running 0 17m
11
12## executor and job runner run in the spot node
13for n in $(kubectl get nodes -l eks.amazonaws.com/capacityType=SPOT --no-headers | cut -d " " -f1)
14 do echo "Pods on instance ${n}:";kubectl get pods -n spark --no-headers --field-selector spec.nodeName=${n}
15 echo
16 done
17# Pods on instance ip-192-168-1-57.ap-southeast-2.compute.internal:
18# delta-streamer-datalake-cdcevents-5397917e324dea27-exec-2 2/2 Running 0 16m
19
20# Pods on instance ip-192-168-34-249.ap-southeast-2.compute.internal:
21# 00000002vhi9hivmjk5-wf8vp 3/3 Running 0 18m
22
23# Pods on instance ip-192-168-79-20.ap-southeast-2.compute.internal:
24# delta-streamer-datalake-cdcevents-5397917e324dea27-exec-1 2/2 Running 0 16m
The Hudi utility will register a table in the Glue data catalog and it can be checked as shown below.
1aws glue get-table --database-name datalake --name cdc_events \
2 --query "Table.[DatabaseName, Name, StorageDescriptor.Location, CreateTime, CreatedBy]"
3# [
4# "datalake",
5# "cdc_events",
6# "s3://emr-eks-example-ap-southeast-2/hudi/cdc-events",
7# "2022-01-07T13:18:49+11:00",
8# "arn:aws:sts::590312749310:assumed-role/emr-eks-example-job-execution/aws-sdk-java-1641521928075"
9# ]
Finally, the details of the table can be queried in Athena.
Clean up
The resources that are created for this post can be deleted using aws cli and eksctl as shown below.
1## delete virtual cluster
2export JOB_RUN_ID=$(aws emr-containers list-job-runs --virtual-cluster-id ${VIRTUAL_CLUSTER_ID} --query "jobRuns[?name=='cdc-events'].id" --output text)
3aws emr-containers cancel-job-run --id ${JOB_RUN_ID} \
4 --virtual-cluster-id ${VIRTUAL_CLUSTER_ID}
5aws emr-containers delete-virtual-cluster --id ${VIRTUAL_CLUSTER_ID}
6## delete s3
7aws s3 rm s3://${S3_BUCKET_NAME} --recursive
8aws s3 rb s3://${S3_BUCKET_NAME} --force
9## delete log group
10aws logs delete-log-group --log-group-name ${LOG_GROUP_NAME}
11## delete glue table/database
12aws glue delete-table --database-name datalake --name cdc_events
13aws glue delete-database --name datalake
14## delete iam role/policy
15aws iam delete-role-policy --role-name ${EMR_ROLE_NAME} --policy-name ${EMR_ROLE_NAME}-policy
16aws iam delete-role --role-name ${EMR_ROLE_NAME}
17## delete eks cluster
18eksctl delete cluster --name ${CLUSTER_NAME}
Summary
In this post, we discussed how to run spark jobs on EKS. First we created an EKS cluster and a node group using eksctl. Then we set up EMR on EKS. A simple PySpark job that shows the basics of EMR on EKS is illustrated and a more realistic example of running Hudi DeltaStreamer utility is demonstrated where the driver and executors are assigned in different node groups.
Comments