Amazon EMR on EKS is a deployment option for Amazon EMR that allows you to automate the provisioning and management of open-source big data frameworks on EKS. While eksctl is popular for working with Amazon EKS clusters, it has limitations when it comes to building infrastructure that integrates multiple AWS services. Also, it is not straightforward to update EKS cluster resources incrementally with it. On the other hand Terraform can be an effective tool for managing infrastructure that includes not only EKS and EMR virtual clusters but also other AWS resources. Moreover, Terraform has a wide range of modules, and it can even be simpler to build and manage infrastructure using those compared to the CLI tool. In this post, we’ll discuss how to provision and manage Spark jobs on EMR on EKS with Terraform. Amazon EKS Blueprints for Terraform will be used for provisioning EKS, EMR virtual cluster and related resources. Also, Spark job autoscaling will be managed by Karpenter where two Spark jobs with and without Dynamic Resource Allocation (DRA) will be compared.

[Update 2023-12-15] Amazon EKS Blueprints for Terraform is upgraded into the version 5 while this post is based on the version 4.7.0. Some links don’t exist in the new GitHub page.

Infrastructure

When a user submits a Spark job, multiple Pods (controller, driver and executors) will be deployed to the EKS cluster that is registered with EMR. In general, Karpenter provides just-in-time capacity for unschedulable Pods by creating (and terminating afterwards) additional nodes. We can configure the pod templates of a Spark job so that all the Pods are managed by Karpenter. In this way, we are able to run it only in transient nodes. Karpenter simplifies autoscaling by provisioning just-in-time capacity, and it also reduces scheduling latency. The source can be found in the post’s GitHub repository.

VPC

Both private and public subnets are created in three availability zones using the AWS VPC module. The first two subnet tags are in relation to the subnet requirements and considerations of Amazon EKS. The last one of the private subnet tags (karpenter.sh/discovery) is added so that Karpenter can discover the relevant subnets when provisioning a node for Spark jobs.

 1# infra/main.tf
 2module "vpc" {
 3  source  = "terraform-aws-modules/vpc/aws"
 4  version = "~> 3.14"
 5
 6  name = "${local.name}-vpc"
 7  cidr = local.vpc.cidr
 8
 9  azs             = local.vpc.azs
10  public_subnets  = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k)]
11  private_subnets = [for k, v in local.vpc.azs : cidrsubnet(local.vpc.cidr, 3, k + 3)]
12
13  enable_nat_gateway   = true
14  single_nat_gateway   = true
15  enable_dns_hostnames = true
16  create_igw           = true
17
18  public_subnet_tags = {
19    "kubernetes.io/cluster/${local.name}" = "shared"
20    "kubernetes.io/role/elb"              = 1
21  }
22
23  private_subnet_tags = {
24    "kubernetes.io/cluster/${local.name}" = "shared"
25    "kubernetes.io/role/internal-elb"     = 1
26    "karpenter.sh/discovery"              = local.name
27  }
28
29  tags = local.tags
30}

EKS Cluster

Amazon EKS Blueprints for Terraform extends the AWS EKS module, and it simplifies to create EKS clusters and Kubenetes add-ons. When it comes to EMR on EKS, it deploys the necessary resources to run EMR Spark jobs. Specifically it automates steps 4 to 7 of the setup documentation and it is possible to configure multiple teams (namespaces) as well. In the module configuration, only one managed node group (managed-ondemand) is created, and it’ll be used to deploy all the critical add-ons. Note that Spark jobs will run in transient nodes, which are managed by Karpenter. Therefore, we don’t need to create node groups for them.

 1# infra/main.tf
 2module "eks_blueprints" {
 3  source = "github.com/aws-ia/terraform-aws-eks-blueprints?ref=v4.7.0"
 4
 5  cluster_name    = local.name
 6  cluster_version = local.eks.cluster_version
 7
 8  # EKS network config
 9  vpc_id             = module.vpc.vpc_id
10  private_subnet_ids = module.vpc.private_subnets
11
12  cluster_endpoint_private_access = true
13  cluster_endpoint_public_access  = true
14
15  node_security_group_additional_rules = {
16    ingress_self_all = {
17      description = "Node to node all ports/protocols, recommended and required for Add-ons"
18      protocol    = "-1"
19      from_port   = 0
20      to_port     = 0
21      type        = "ingress"
22      self        = true
23    }
24    egress_all = {
25      description      = "Node all egress, recommended outbound traffic for Node groups"
26      protocol         = "-1"
27      from_port        = 0
28      to_port          = 0
29      type             = "egress"
30      cidr_blocks      = ["0.0.0.0/0"]
31      ipv6_cidr_blocks = ["::/0"]
32    }
33    ingress_cluster_to_node_all_traffic = {
34      description                   = "Cluster API to Nodegroup all traffic, can be restricted further eg, spark-operator 8080..."
35      protocol                      = "-1"
36      from_port                     = 0
37      to_port                       = 0
38      type                          = "ingress"
39      source_cluster_security_group = true
40    }
41  }
42
43  # EKS manage node groups
44  managed_node_groups = {
45    ondemand = {
46      node_group_name        = "managed-ondemand"
47      instance_types         = ["m5.xlarge"]
48      subnet_ids             = module.vpc.private_subnets
49      max_size               = 5
50      min_size               = 1
51      desired_size           = 1
52      create_launch_template = true
53      launch_template_os     = "amazonlinux2eks"
54      update_config = [{
55        max_unavailable_percentage = 30
56      }]
57    }
58  }
59
60  # EMR on EKS
61  enable_emr_on_eks = true
62  emr_on_eks_teams = {
63    analytics = {
64      namespace               = "analytics"
65      job_execution_role      = "analytics-job-execution-role"
66      additional_iam_policies = [aws_iam_policy.emr_on_eks.arn]
67    }
68  }
69
70  tags = local.tags
71}

EMR Virtual Cluster

Terraform has the EMR virtual cluster resource and the EKS cluster can be registered with the associating namespace (analytics). It’ll complete the last step of the setup documentation.

 1# infra/main.tf
 2resource "aws_emrcontainers_virtual_cluster" "analytics" {
 3  name = "${module.eks_blueprints.eks_cluster_id}-analytics"
 4
 5  container_provider {
 6    id   = module.eks_blueprints.eks_cluster_id
 7    type = "EKS"
 8
 9    info {
10      eks_info {
11        namespace = "analytics"
12      }
13    }
14  }
15}

Kubernetes Add-ons

The Blueprints include the kubernetes-addons module that simplifies deployment of Amazon EKS add-ons as well as Kubernetes add-ons. For scaling Spark jobs in transient nodes, Karpenter and AWS Node Termination Handler add-ons will be used mainly.

 1# infra/main.tf
 2module "eks_blueprints_kubernetes_addons" {
 3  source = "github.com/aws-ia/terraform-aws-eks-blueprints//modules/kubernetes-addons?ref=v4.7.0"
 4
 5  eks_cluster_id       = module.eks_blueprints.eks_cluster_id
 6  eks_cluster_endpoint = module.eks_blueprints.eks_cluster_endpoint
 7  eks_oidc_provider    = module.eks_blueprints.oidc_provider
 8  eks_cluster_version  = module.eks_blueprints.eks_cluster_version
 9
10  # EKS add-ons
11  enable_amazon_eks_vpc_cni    = true
12  enable_amazon_eks_coredns    = true
13  enable_amazon_eks_kube_proxy = true
14
15  # K8s add-ons
16  enable_coredns_autoscaler           = true
17  enable_metrics_server               = true
18  enable_cluster_autoscaler           = true
19  enable_karpenter                    = true
20  enable_aws_node_termination_handler = true
21
22  tags = local.tags
23}

Karpenter

According to the AWS News Blog,

Karpenter is an open-source, flexible, high-performance Kubernetes cluster autoscaler built with AWS. It helps improve your application availability and cluster efficiency by rapidly launching right-sized compute resources in response to changing application load. Karpenter also provides just-in-time compute resources to meet your application’s needs and will soon automatically optimize a cluster’s compute resource footprint to reduce costs and improve performance.

Simply put, Karpeter adds nodes to handle unschedulable pods, schedules pods on those nodes, and removes the nodes when they are not needed. To configure Karpenter, we need to create provisioners that define how Karpenter manages unschedulable pods and expired nodes. For Spark jobs, we can deploy separate provisioners for the driver and executor programs.

Spark Driver Provisioner

The labels contain arbitrary key-value pairs. As shown later, we can add it to the nodeSelector field of the Spark pod template. Then Karpenter provisions a node (if not existing) as defined by this Provisioner object. The requirements define which nodes to provision. Here 3 well-known labels are specified - availability zone, instance family and capacity type. The provider section is specific to cloud providers and, for AWS, we need to indicate InstanceProfile, LaunchTemplate, SubnetSelector or SecurityGroupSelector. Here we’ll use a launch template that keeps the instance group and security group ids. SubnetSelector is added separately as it is not covered by the launch template. Recall that we added a tag to private subnets (“karpenter.sh/discovery” = local.name) and we can use it here so that Karpenter discovers the relevant subnets when provisioning a node.

 1# infra/provisioners/spark-driver.yaml
 2apiVersion: karpenter.sh/v1alpha5
 3kind: Provisioner
 4metadata:
 5  name: spark-driver
 6spec:
 7  labels:
 8    type: karpenter
 9    provisioner: spark-driver
10  ttlSecondsAfterEmpty: 30
11  requirements:
12    - key: "topology.kubernetes.io/zone"
13      operator: In
14      values: [${az}]
15    - key: karpenter.k8s.aws/instance-family
16      operator: In
17      values: [m4, m5]
18    - key: "karpenter.sh/capacity-type"
19      operator: In
20      values: ["on-demand"]
21  limits:
22    resources:
23      cpu: "1000"
24      memory: 1000Gi
25  provider:
26    launchTemplate: "karpenter-${cluster_name}"
27    subnetSelector:
28      karpenter.sh/discovery: ${cluster_name}

Spark Executor Provisioner

The executor provisioner configuration is similar except that it allows more instance family values and the capacity type value is changed into spot.

 1# infra/provisioners/spark-executor.yaml
 2apiVersion: karpenter.sh/v1alpha5
 3kind: Provisioner
 4metadata:
 5  name: spark-executor
 6spec:
 7  labels:
 8    type: karpenter
 9    provisioner: spark-executor
10  ttlSecondsAfterEmpty: 30
11  requirements:
12    - key: "topology.kubernetes.io/zone"
13      operator: In
14      values: [${az}]
15    - key: karpenter.k8s.aws/instance-family
16      operator: In
17      values: [m4, m5, r4, r5]
18    - key: "karpenter.sh/capacity-type"
19      operator: In
20      values: ["spot"]
21  limits:
22    resources:
23      cpu: "1000"
24      memory: 1000Gi
25  provider:
26    launchTemplate: "karpenter-${cluster_name}"
27    subnetSelector:
28      karpenter.sh/discovery: ${cluster_name}

Terraform Resources

As mentioned earlier, a launch template is created for the provisioners, and it includes the instance profile, security group ID and additional configuration. The provisioner resources are created from the YAML manifests. Note we only select a single available zone in order to save cost and improve performance of Spark jobs.

 1# infra/main.tf
 2module "karpenter_launch_templates" {
 3  source = "github.com/aws-ia/terraform-aws-eks-blueprints//modules/launch-templates?ref=v4.7.0"
 4
 5  eks_cluster_id = module.eks_blueprints.eks_cluster_id
 6
 7  launch_template_config = {
 8    linux = {
 9      ami                    = data.aws_ami.eks.id
10      launch_template_prefix = "karpenter"
11      iam_instance_profile   = module.eks_blueprints.managed_node_group_iam_instance_profile_id[0]
12      vpc_security_group_ids = [module.eks_blueprints.worker_node_security_group_id]
13      block_device_mappings = [
14        {
15          device_name = "/dev/xvda"
16          volume_type = "gp3"
17          volume_size = 100
18        }
19      ]
20    }
21  }
22
23  tags = merge(local.tags, { Name = "karpenter" })
24}
25
26# deploy spark provisioners for Karpenter autoscaler
27data "kubectl_path_documents" "karpenter_provisioners" {
28  pattern = "${path.module}/provisioners/spark*.yaml"
29  vars = {
30    az           = join(",", slice(local.vpc.azs, 0, 1))
31    cluster_name = local.name
32  }
33}
34
35resource "kubectl_manifest" "karpenter_provisioner" {
36  for_each  = toset(data.kubectl_path_documents.karpenter_provisioners.documents)
37  yaml_body = each.value
38
39  depends_on = [module.eks_blueprints_kubernetes_addons]
40}

Now we can deploy the infrastructure. Be patient until it completes.

Spark Job

A test spark app and pod templates are uploaded to a S3 bucket. The spark app is for testing autoscaling, and it creates multiple parallel threads and waits for a few seconds - it is obtained from EKS Workshop. The pod templates basically select the relevant provisioners for the driver and executor programs. Two Spark jobs will run with and without Dynamic Resource Allocation (DRA). DRA is a Spark feature where the initial number of executors are spawned, and then it is increased until the maximum number of executors is met to process the pending tasks. Idle executors are terminated when there are no pending tasks. This feature is particularly useful if we are not sure how many executors are necessary.

 1## upload.sh
 2#!/usr/bin/env bash
 3
 4# write test script
 5mkdir -p scripts/src
 6cat << EOF > scripts/src/threadsleep.py
 7import sys
 8from time import sleep
 9from pyspark.sql import SparkSession
10spark = SparkSession.builder.appName("threadsleep").getOrCreate()
11def sleep_for_x_seconds(x):sleep(x*20)
12sc=spark.sparkContext
13sc.parallelize(range(1,6), 5).foreach(sleep_for_x_seconds)
14spark.stop()
15EOF
16
17# write pod templates
18mkdir -p scripts/config
19cat << EOF > scripts/config/driver-template.yaml
20apiVersion: v1
21kind: Pod
22spec:
23  nodeSelector:
24    type: 'karpenter'
25    provisioner: 'spark-driver'
26  tolerations:
27    - key: 'spark-driver'
28      operator: 'Exists'
29      effect: 'NoSchedule'
30  containers:
31  - name: spark-kubernetes-driver
32EOF
33
34cat << EOF > scripts/config/executor-template.yaml
35apiVersion: v1
36kind: Pod
37spec:
38  nodeSelector:
39    type: 'karpenter'
40    provisioner: 'spark-executor'
41  tolerations:
42    - key: 'spark-executor'
43      operator: 'Exists'
44      effect: 'NoSchedule'
45  containers:
46  - name: spark-kubernetes-executor
47EOF
48
49# sync to S3
50DEFAULT_BUCKET_NAME=$(terraform -chdir=./infra output --raw default_bucket_name)
51aws s3 sync . s3://$DEFAULT_BUCKET_NAME --exclude "*" --include "scripts/*"

Without Dynamic Resource Allocation (DRA)

15 executors are configured to run for the Spark job without DRA. The application configuration is overridden to disable DRA and maps pod templates for the diver and executor programs.

 1export VIRTUAL_CLUSTER_ID=$(terraform -chdir=./infra output --raw emrcontainers_virtual_cluster_id)
 2export EMR_ROLE_ARN=$(terraform -chdir=./infra output --json emr_on_eks_role_arn | jq '.[0]' -r)
 3export DEFAULT_BUCKET_NAME=$(terraform -chdir=./infra output --raw default_bucket_name)
 4export AWS_REGION=$(aws ec2 describe-availability-zones --query 'AvailabilityZones[0].[RegionName]' --output text)
 5
 6## without DRA
 7aws emr-containers start-job-run \
 8--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
 9--name threadsleep-karpenter-wo-dra \
10--execution-role-arn $EMR_ROLE_ARN \
11--release-label emr-6.7.0-latest \
12--region $AWS_REGION \
13--job-driver '{
14    "sparkSubmitJobDriver": {
15        "entryPoint": "s3://'${DEFAULT_BUCKET_NAME}'/scripts/src/threadsleep.py",
16        "sparkSubmitParameters": "--conf spark.executor.instances=15 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
17        }
18    }' \
19--configuration-overrides '{
20    "applicationConfiguration": [
21      {
22        "classification": "spark-defaults",
23        "properties": {
24          "spark.dynamicAllocation.enabled":"false",
25          "spark.kubernetes.executor.deleteOnTermination": "true",
26          "spark.kubernetes.driver.podTemplateFile":"s3://'${DEFAULT_BUCKET_NAME}'/scripts/config/driver-template.yaml",
27          "spark.kubernetes.executor.podTemplateFile":"s3://'${DEFAULT_BUCKET_NAME}'/scripts/config/executor-template.yaml"
28         }
29      }
30    ]
31}'

As indicated earlier, Karpenter can provide just-in-time compute resources to meet the Spark job’s requirements, and we see that 3 new nodes are added accordingly. Note that, unlike cluster autoscaler, Karpenter provision nodes without creating a node group.

Once the job completes, the new nodes are terminated as expected.

Below shows the event timeline of the Spark job. It adds all the 15 executors regardless of whether there are pending tasks or not. The DRA feature of Spark can be beneficial in this situation, and it’ll be discussed in the next section.

With Dynamic Resource Allocation (DRA)

Here the initial number of executors is set to 1. With DRA enabled, the driver is expected to scale up the executors until it reaches the maximum number of executors if there are pending tasks.

 1export VIRTUAL_CLUSTER_ID=$(terraform -chdir=./infra output --raw emrcontainers_virtual_cluster_id)
 2export EMR_ROLE_ARN=$(terraform -chdir=./infra output --json emr_on_eks_role_arn | jq '.[0]' -r)
 3export DEFAULT_BUCKET_NAME=$(terraform -chdir=./infra output --raw default_bucket_name)
 4export AWS_REGION=$(aws ec2 describe-availability-zones --query 'AvailabilityZones[0].[RegionName]' --output text)
 5
 6## with DRA
 7aws emr-containers start-job-run \
 8--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
 9--name threadsleep-karpenter-w-dra \
10--execution-role-arn $EMR_ROLE_ARN \
11--release-label emr-6.7.0-latest \
12--region $AWS_REGION \
13--job-driver '{
14    "sparkSubmitJobDriver": {
15        "entryPoint": "s3://'${DEFAULT_BUCKET_NAME}'/scripts/src/threadsleep.py",
16        "sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=1G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
17        }
18    }' \
19--configuration-overrides '{
20    "applicationConfiguration": [
21      {
22        "classification": "spark-defaults",
23        "properties": {
24          "spark.dynamicAllocation.enabled":"true",
25          "spark.dynamicAllocation.shuffleTracking.enabled":"true",
26          "spark.dynamicAllocation.minExecutors":"1",
27          "spark.dynamicAllocation.maxExecutors":"10",
28          "spark.dynamicAllocation.initialExecutors":"1",
29          "spark.dynamicAllocation.schedulerBacklogTimeout": "1s",
30          "spark.dynamicAllocation.executorIdleTimeout": "5s",
31          "spark.kubernetes.driver.podTemplateFile":"s3://'${DEFAULT_BUCKET_NAME}'/scripts/config/driver-template.yaml",
32          "spark.kubernetes.executor.podTemplateFile":"s3://'${DEFAULT_BUCKET_NAME}'/scripts/config/executor-template.yaml"
33         }
34      }
35    ]
36}'

As expected, the executors are added dynamically and removed subsequently as they are not needed.

Summary

In this post, it is discussed how to provision and manage Spark jobs on EMR on EKS with Terraform. Amazon EKS Blueprints for Terraform is used for provisioning EKS, EMR virtual cluster and related resources. Also, Karpenter is used to manage Spark job autoscaling and two Spark jobs with and without Dynamic Resource Allocation (DRA) are used for comparison. It is found that Karpenter manages transient nodes for Spark jobs to meet their scaling requirements effectively.