When we develop a Spark application on EMR, we can use docker for local development or notebooks via EMR Studio (or EMR Notebooks). However, the local development option is not viable if the size of data is large. Also, I am not a fan of notebooks as it is not possible to utilise the features my editor supports such as syntax highlighting, autocomplete and code formatting. Moreover, it is not possible to organise code into modules and to perform unit testing properly with that option. In this post, We will discuss how to set up a remote development environment on an EMR cluster deployed in a private subnet with VPN and the VS Code remote SSH extension. Typical Spark development examples will be illustrated while sharing the cluster with multiple users. Overall it brings another effective way of developing Spark apps on EMR, which improves developer experience significantly.

Architecture

An EMR cluster is deployed in a private subnet and, by default, it is not possible to access it from the developer machine. We can construct a PC-to-PC VPN with SoftEther VPN to establish connection to the master node of the cluster. The VPN server runs in a public subnet, and it is managed by an autoscaling group where only a single instance is maintained. An elastic IP address is associated with the instance so that its public IP doesn’t change even if the EC2 instance is recreated. Access from the VPN server to the master node is allowed by an additional security group where the VPN’s security group is granted access to the master node. The infrastructure is built using Terraform and the source can be found in the post’s GitHub repository.

SoftEther VPN provides the server and client manager programs and they can be downloaded from the download centre page. We can create a VPN user using the server manager and the user can establish connection using the client manager. In this way a developer can access an EMR cluster deployed in a private subnet from the developer machine. Check one of my earlier posts titled Simplify Your Development on AWS with Terraform for a step-by-step illustration of creating a user and making a connection. The VS Code Remote - SSH extension is used to open a folder in the master node of an EMR cluster. In this way, developer experience can be improved significantly while making use of the full feature set of VS Code. The architecture of the remote development environment is shown below.

Infrastructure

The infrastructure of this post is an extension that I illustrated in the previous post. The resources covered there (VPC, subnets, auto scaling group for VPN etc.) won’t be repeated. The main resource in this post is an EMR cluster and the latest EMR 6.7.0 release is deployed with single master and core node instances. It is set up to use the AWS Glue Data Catalog as the metastore for Hive and Spark SQL by updating the corresponding configuration classification. Additionally, a managed scaling policy is created so that up to 5 instances are added to the core node. Note the additional security group of the master and slave by which the VPN server is granted access to the master and core node instances - the details of that security group is shown below.

 1# infra/emr.tf
 2resource "aws_emr_cluster" "emr_cluster" {
 3  name                              = "${local.name}-emr-cluster"
 4  release_label                     = local.emr.release_label # emr-6.7.0
 5  service_role                      = aws_iam_role.emr_service_role.arn
 6  autoscaling_role                  = aws_iam_role.emr_autoscaling_role.arn
 7  applications                      = local.emr.applications # ["Spark", "Livy", "JupyterEnterpriseGateway", "Hive"]
 8  ebs_root_volume_size              = local.emr.ebs_root_volume_size
 9  log_uri                           = "s3n://${aws_s3_bucket.default_bucket[0].id}/elasticmapreduce/"
10  step_concurrency_level            = 256
11  keep_job_flow_alive_when_no_steps = true
12  termination_protection            = false
13
14  ec2_attributes {
15    key_name                          = aws_key_pair.emr_key_pair.key_name
16    instance_profile                  = aws_iam_instance_profile.emr_ec2_instance_profile.arn
17    subnet_id                         = element(tolist(module.vpc.private_subnets), 0)
18    emr_managed_master_security_group = aws_security_group.emr_master.id
19    emr_managed_slave_security_group  = aws_security_group.emr_slave.id
20    service_access_security_group     = aws_security_group.emr_service_access.id
21    additional_master_security_groups = aws_security_group.emr_vpn_access.id # grant access to VPN server
22    additional_slave_security_groups  = aws_security_group.emr_vpn_access.id # grant access to VPN server
23  }
24
25  master_instance_group {
26    instance_type  = local.emr.instance_type # m5.xlarge
27    instance_count = local.emr.instance_count # 1
28  }
29  core_instance_group {
30    instance_type  = local.emr.instance_type # m5.xlarge
31    instance_count = local.emr.instance_count # 1
32  }
33
34  configurations_json = <<EOF
35    [
36      {
37          "Classification": "hive-site",
38          "Properties": {
39              "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
40          }
41      },
42      {
43          "Classification": "spark-hive-site",
44          "Properties": {
45              "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
46          }
47      }
48    ]
49  EOF
50
51  tags = local.tags
52
53  depends_on = [module.vpc]
54}
55
56resource "aws_emr_managed_scaling_policy" "emr_scaling_policy" {
57  cluster_id = aws_emr_cluster.emr_cluster.id
58
59  compute_limits {
60    unit_type              = "Instances"
61    minimum_capacity_units = 1
62    maximum_capacity_units = 5
63  }
64}

The following security group is created to enable access from the VPN server to the EMR instances. Note that the inbound rule is created only when the local.vpn.to_create variable value is true while the security group is created always - if the value is false, the security group has no inbound rule.

 1# infra/emr.tf
 2resource "aws_security_group" "emr_vpn_access" {
 3  name   = "${local.name}-emr-vpn-access"
 4  vpc_id = module.vpc.vpc_id
 5
 6  lifecycle {
 7    create_before_destroy = true
 8  }
 9
10  tags = local.tags
11}
12
13resource "aws_security_group_rule" "emr_vpn_inbound" {
14  count                    = local.vpn.to_create ? 1 : 0
15  type                     = "ingress"
16  description              = "VPN access"
17  security_group_id        = aws_security_group.emr_vpn_access.id
18  protocol                 = "tcp"
19  from_port                = 0
20  to_port                  = 65535
21  source_security_group_id = aws_security_group.vpn[0].id
22}

Change to Secret Generation

For configuring the VPN server, we need a IPsec pre-shared key and admin password. While those are specified as variables earlier, they are generated internally in this post for simplicity. The Terraform shell resource module generates and concatenates them with double dashes (–). The corresponding values are parsed into the user data of the VPN instance and the string is saved into a file to be used for configuring the VPN server manager.

 1## create VPN secrets - IPsec Pre-Shared Key and admin password for VPN
 2##  see https://cloud.google.com/network-connectivity/docs/vpn/how-to/generating-pre-shared-key
 3module "vpn_secrets" {
 4  source = "Invicton-Labs/shell-resource/external"
 5
 6  # generate <IPsec Pre-Shared Key>--<admin password> and parsed in vpn module
 7  command_unix = "echo $(openssl rand -base64 24)--$(openssl rand -base64 24)"
 8}
 9
10resource "local_file" "vpn_secrets" {
11  content  = module.vpn_secrets.stdout
12  filename = "${path.module}/secrets/vpn_secrets"
13}
14
15module "vpn" {
16  source  = "terraform-aws-modules/autoscaling/aws"
17  version = "~> 6.5"
18  count   = local.vpn.to_create ? 1 : 0
19
20  name = "${local.name}-vpn-asg"
21
22  key_name            = local.vpn.to_create ? aws_key_pair.key_pair[0].key_name : null
23  vpc_zone_identifier = module.vpc.public_subnets
24  min_size            = 1
25  max_size            = 1
26  desired_capacity    = 1
27
28  image_id                 = data.aws_ami.amazon_linux_2.id
29  instance_type            = element([for s in local.vpn.spot_override : s.instance_type], 0)
30  security_groups          = [aws_security_group.vpn[0].id]
31  iam_instance_profile_arn = aws_iam_instance_profile.vpn[0].arn
32
33  # Launch template
34  create_launch_template = true
35  update_default_version = true
36
37  user_data = base64encode(join("\n", [
38    "#cloud-config",
39    yamlencode({
40      # https://cloudinit.readthedocs.io/en/latest/topics/modules.html
41      write_files : [
42        {
43          path : "/opt/vpn/bootstrap.sh",
44          content : templatefile("${path.module}/scripts/bootstrap.sh", {
45            aws_region     = local.region,
46            allocation_id  = aws_eip.vpn[0].allocation_id,
47            vpn_psk        = split("--", replace(module.vpn_secrets.stdout, "\n", ""))[0], # specify IPsec pre-shared key
48            admin_password = split("--", replace(module.vpn_secrets.stdout, "\n", ""))[1]  # specify admin password
49          }),
50          permissions : "0755",
51        }
52      ],
53      runcmd : [
54        ["/opt/vpn/bootstrap.sh"],
55      ],
56    })
57  ]))
58
59  ...
60
61  tags = local.tags
62}

After deploying all the resources, it is good to go to the next section if we’re able to connect to the VPN server as shown below.

Preparation

User Creation

While we are able to use the default hadoop user for development, we can add additional users to share the cluster as well. First let’s access the master node via ssh as shown below. Note the access key is stored in the infra/key-pair folder and the master private DNS name can be obtained from the _emr_cluster_master_dns _output value.

 1# access to the master node via ssh
 2jaehyeon@cevo$ export EMR_MASTER_DNS=$(terraform -chdir=./infra output --raw emr_cluster_master_dns)
 3jaehyeon@cevo$ ssh -i infra/key-pair/emr-remote-dev-emr-key.pem hadoop@$EMR_MASTER_DNS
 4The authenticity of host 'ip-10-0-113-113.ap-southeast-2.compute.internal (10.0.113.113)' can't be established.
 5ECDSA key fingerprint is SHA256:mNdgPWnDkCG/6IsUDdHAETe/InciOatb8jwELnwfWR4.
 6Are you sure you want to continue connecting (yes/no/[fingerprint])? yes
 7Warning: Permanently added 'ip-10-0-113-113.ap-southeast-2.compute.internal,10.0.113.113' (ECDSA) to the list of known hosts.
 8
 9       __|  __|_  )
10       _|  (     /   Amazon Linux 2 AMI
11      ___|\___|___|
12
13https://aws.amazon.com/amazon-linux-2/
1417 package(s) needed for security, out of 38 available
15Run "sudo yum update" to apply all updates.
16                                                                   
17EEEEEEEEEEEEEEEEEEEE MMMMMMMM           MMMMMMMM RRRRRRRRRRRRRRR    
18E::::::::::::::::::E M:::::::M         M:::::::M R::::::::::::::R  
19EE:::::EEEEEEEEE:::E M::::::::M       M::::::::M R:::::RRRRRR:::::R
20  E::::E       EEEEE M:::::::::M     M:::::::::M RR::::R      R::::R
21  E::::E             M::::::M:::M   M:::M::::::M   R:::R      R::::R
22  E:::::EEEEEEEEEE   M:::::M M:::M M:::M M:::::M   R:::RRRRRR:::::R
23  E::::::::::::::E   M:::::M  M:::M:::M  M:::::M   R:::::::::::RR  
24  E:::::EEEEEEEEEE   M:::::M   M:::::M   M:::::M   R:::RRRRRR::::R  
25  E::::E             M:::::M    M:::M    M:::::M   R:::R      R::::R
26  E::::E       EEEEE M:::::M     MMM     M:::::M   R:::R      R::::R
27EE:::::EEEEEEEE::::E M:::::M             M:::::M   R:::R      R::::R
28E::::::::::::::::::E M:::::M             M:::::M RR::::R      R::::R
29EEEEEEEEEEEEEEEEEEEE MMMMMMM             MMMMMMM RRRRRR      RRRRRR
30                                                                   
31[hadoop@ip-10-0-113-113 ~]$

A user can be created as shown below. Optionally the user is added to the sudoers file so that the user is allowed to run a command as the root user without specifying the password. Note this is a shortcut only, and please check this page for proper usage of editing the sudoers file.

1# create a user and add to sudoers
2[hadoop@ip-10-0-113-113 ~]$ sudo adduser jaehyeon
3[hadoop@ip-10-0-113-113 ~]$ ls /home/
4ec2-user  emr-notebook  hadoop  jaehyeon
5[hadoop@ip-10-0-113-113 ~]$ sudo su
6[root@ip-10-0-113-113 hadoop]# chmod +w /etc/sudoers
7[root@ip-10-0-113-113 hadoop]# echo "jaehyeon  ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers

Also, as described in the EMR documentation, we must add the HDFS user directory for the user account and grant ownership of the directory so that the user is allowed to log in to the cluster to run Hadoop jobs.

 1[root@ip-10-0-113-113 hadoop]# sudo su - hdfs
 2# create user directory
 3-bash-4.2$ hdfs dfs -mkdir /user/jaehyeon
 4SLF4J: Class path contains multiple SLF4J bindings.
 5SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 6SLF4J: Found binding in [jar:file:/usr/lib/tez/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 7SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
 8SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 9# update directory ownership
10-bash-4.2$ hdfs dfs -chown jaehyeon:jaehyeon /user/jaehyeon
11SLF4J: Class path contains multiple SLF4J bindings.
12SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
13SLF4J: Found binding in [jar:file:/usr/lib/tez/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
14SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
15SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16# check directories in /user
17-bash-4.2$ hdfs dfs -ls /user
18SLF4J: Class path contains multiple SLF4J bindings.
19SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
20SLF4J: Found binding in [jar:file:/usr/lib/tez/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
21SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
22SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
23Found 7 items
24drwxrwxrwx   - hadoop   hdfsadmingroup          0 2022-09-03 10:19 /user/hadoop
25drwxr-xr-x   - mapred   mapred                  0 2022-09-03 10:19 /user/history
26drwxrwxrwx   - hdfs     hdfsadmingroup          0 2022-09-03 10:19 /user/hive
27drwxr-xr-x   - jaehyeon jaehyeon                0 2022-09-03 10:39 /user/jaehyeon
28drwxrwxrwx   - livy     livy                    0 2022-09-03 10:19 /user/livy
29drwxrwxrwx   - root     hdfsadmingroup          0 2022-09-03 10:19 /user/root
30drwxrwxrwx   - spark    spark                   0 2022-09-03 10:19 /user/spark

Finally, we need to add the public key to the .ssh/authorized_keys file in order to set up public key authentication for SSH access.

1# add public key
2[hadoop@ip-10-0-113-113 ~]$ sudo su - jaehyeon
3[jaehyeon@ip-10-0-113-113 ~]$ mkdir .ssh
4[jaehyeon@ip-10-0-113-113 ~]$ chmod 700 .ssh
5[jaehyeon@ip-10-0-113-113 ~]$ touch .ssh/authorized_keys
6[jaehyeon@ip-10-0-113-113 ~]$ chmod 600 .ssh/authorized_keys
7[jaehyeon@ip-10-0-113-113 ~]$ PUBLIC_KEY="<SSH-PUBLIC-KEY>"
8[jaehyeon@ip-10-0-113-113 ~]$ echo $PUBLIC_KEY > .ssh/authorized_keys

Clone Repository

As we can open a folder in the master node, the GitHub repository is cloned to each user’s home folder to open later.

1[hadoop@ip-10-0-113-113 ~]$ sudo yum install git
2[hadoop@ip-10-0-113-113 ~]$ git clone https://github.com/jaehyeon-kim/emr-remote-dev.git
3[hadoop@ip-10-0-113-113 ~]$ ls
4emr-remote-dev
5[hadoop@ip-10-0-113-113 ~]$ sudo su - jaehyeon
6[jaehyeon@ip-10-0-113-113 ~]$ git clone https://github.com/jaehyeon-kim/emr-remote-dev.git
7[jaehyeon@ip-10-0-113-113 ~]$ ls
8emr-remote-dev

Access to EMR Cluster

Now we have two users that have access to the EMR cluster and their connection details are saved into an SSH configuration file as shown below.

 1Host emr-hadoop
 2  HostName ip-10-0-113-113.ap-southeast-2.compute.internal
 3  User hadoop
 4  ForwardAgent yes
 5  IdentityFile C:\Users\<username>\.ssh\emr-remote-dev-emr-key.pem
 6Host emr-jaehyeon
 7  HostName ip-10-0-113-113.ap-southeast-2.compute.internal
 8  User jaehyeon
 9  ForwardAgent yes
10  IdentityFile C:\Users\<username>\.ssh\id_rsa

Then we can see the connection details in the remote explorer menu of VS Code. Note the remote SSH extension should be installed for it. On right-clicking the mouse on the emr-hadoop connection, we can select the option to connect to the host in a new window.

In a new window, a menu pops up to select the platform of the remote host.

If it’s the first time connecting to the server, it requests to confirm whether you trust and want to continue connecting to the host. We can hit Continue.

Once we are connected, we can open a folder in the server. On selecting File > Open Folder… menu, we can see a list of folders that we can open. Let’s open the repository folder we cloned earlier.

VS Code asks whether we trust the authors of the files in this folder and we can hit Yes.

Now access to the server with the remote SSH extension is complete and we can check it by opening a terminal where it shows the typical EMR shell.

Python Configuration

We can install the Python extension at minimum and it indicates the extension will be installed in the remote server (emr-hadoop).

We’ll use the Pyspark and py4j packages that are included in the existing spark distribution. It can be done simply by creating an .env file that adds the relevant paths to the PYTHONPATH variable. In the following screenshot, you see that there is no warning to import SparkSession.

Remote Development

Transform Data

It is a simple Spark application that reads a sample NY taxi trip dataset from a public S3 bucket. Once loaded, it converts the pick-up and drop-off datetime columns from string to timestamp followed by writing the transformed data to a destination S3 bucket. It finishes by creating a Glue table with the transformed data.

 1# tripdata_write.py
 2from pyspark.sql import SparkSession
 3
 4from utils import to_timestamp_df
 5
 6if __name__ == "__main__":
 7    spark = SparkSession.builder.appName("Trip Data").enableHiveSupport().getOrCreate()
 8
 9    dbname = "tripdata"
10    tblname = "ny_taxi"
11    bucket_name = "emr-remote-dev-590312749310-ap-southeast-2"
12    dest_path = f"s3://{bucket_name}/{tblname}/"
13    src_path = "s3://aws-data-analytics-workshops/shared_datasets/tripdata/"
14    # read csv
15    ny_taxi = spark.read.option("inferSchema", "true").option("header", "true").csv(src_path)
16    ny_taxi = to_timestamp_df(ny_taxi, ["lpep_pickup_datetime", "lpep_dropoff_datetime"])
17    ny_taxi.printSchema()
18    # write parquet
19    ny_taxi.write.mode("overwrite").parquet(dest_path)
20    # create glue table
21    ny_taxi.createOrReplaceTempView(tblname)
22    spark.sql(f"CREATE DATABASE IF NOT EXISTS {dbname}")
23    spark.sql(f"USE {dbname}")
24    spark.sql(
25        f"""CREATE TABLE IF NOT EXISTS {tblname}
26            USING PARQUET
27            LOCATION '{dest_path}'
28            AS SELECT * FROM {tblname}
29        """
30    )

As the spark application should run in a cluster, we need to copy it into HDFS. For simplicity, I copied the current folder into the /user/hadoop/emr-remote-dev directory.

 1# copy current folder into /user/hadoop/emr-remote-dev
 2[hadoop@ip-10-0-113-113 emr-remote-dev]$ hdfs dfs -put . /user/hadoop/emr-remote-dev
 3SLF4J: Class path contains multiple SLF4J bindings.
 4SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 5SLF4J: Found binding in [jar:file:/usr/lib/tez/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 6SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
 7SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 8# check contents in /user/hadoop/emr-remote-dev
 9[hadoop@ip-10-0-113-113 emr-remote-dev]$ hdfs dfs -ls /user/hadoop/emr-remote-dev
10SLF4J: Class path contains multiple SLF4J bindings.
11SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
12SLF4J: Found binding in [jar:file:/usr/lib/tez/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
13SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
14SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15Found 10 items
16-rw-r--r--   1 hadoop hdfsadmingroup         93 2022-09-03 11:11 /user/hadoop/emr-remote-dev/.env
17drwxr-xr-x   - hadoop hdfsadmingroup          0 2022-09-03 11:11 /user/hadoop/emr-remote-dev/.git
18-rw-r--r--   1 hadoop hdfsadmingroup        741 2022-09-03 11:11 /user/hadoop/emr-remote-dev/.gitignore
19drwxr-xr-x   - hadoop hdfsadmingroup          0 2022-09-03 11:11 /user/hadoop/emr-remote-dev/.vscode
20-rw-r--r--   1 hadoop hdfsadmingroup         25 2022-09-03 11:11 /user/hadoop/emr-remote-dev/README.md
21drwxr-xr-x   - hadoop hdfsadmingroup          0 2022-09-03 11:11 /user/hadoop/emr-remote-dev/infra
22-rw-r--r--   1 hadoop hdfsadmingroup        873 2022-09-03 11:11 /user/hadoop/emr-remote-dev/test_utils.py
23-rw-r--r--   1 hadoop hdfsadmingroup        421 2022-09-03 11:11 /user/hadoop/emr-remote-dev/tripdata_read.sh
24-rw-r--r--   1 hadoop hdfsadmingroup       1066 2022-09-03 11:11 /user/hadoop/emr-remote-dev/tripdata_write.py
25-rw-r--r--   1 hadoop hdfsadmingroup        441 2022-09-03 11:11 /user/hadoop/emr-remote-dev/utils.py

The app can be submitted by specifying the HDFS locations of the app and source file. It is deployed to the YARN cluster with the client deployment mode. In this way, data processing can be performed by executors in the core node and we are able to check execution details in the same terminal.

1[hadoop@ip-10-0-113-113 emr-remote-dev]$ spark-submit \
2  --master yarn \
3  --deploy-mode client \
4  --py-files hdfs:/user/hadoop/emr-remote-dev/utils.py \
5  hdfs:/user/hadoop/emr-remote-dev/tripdata_write.py

Once the app completes, we can see that a Glue database named tripdata is created, and it includes a table named ny_taxi.

Read Data

We can connect to the cluster with the other user account as well. Below shows an example of the PySpark shell that reads data from the table created earlier. It just reads the Glue table and adds a column of trip duration followed by showing the summary statistics of key columns.

Unit Test

The Spark application uses a custom function that converts the data type of one or more columns from string to timestamp - to_timestamp_df(). The source of the function and the testing script can be found below.

 1# utils.py
 2from typing import List, Union
 3from pyspark.sql import DataFrame
 4from pyspark.sql.functions import col, to_timestamp
 5
 6def to_timestamp_df(
 7    df: DataFrame, fields: Union[List[str], str], format: str = "M/d/yy H:mm"
 8) -> DataFrame:
 9    fields = [fields] if isinstance(fields, str) else fields
10    for field in fields:
11        df = df.withColumn(field, to_timestamp(col(field), format))
12    return df
13
14# test_utils.py
15import pytest
16import datetime
17from pyspark.sql import SparkSession
18from py4j.protocol import Py4JError
19
20from utils import to_timestamp_df
21
22
23@pytest.fixture(scope="session")
24def spark():
25    return (
26        SparkSession.builder.master("local")
27        .appName("test")
28        .config("spark.submit.deployMode", "client")
29        .getOrCreate()
30    )
31
32
33def test_to_timestamp_success(spark):
34    raw_df = spark.createDataFrame(
35        [("1/1/17 0:01",)],
36        ["date"],
37    )
38
39    test_df = to_timestamp_df(raw_df, "date", "M/d/yy H:mm")
40    for row in test_df.collect():
41        assert row["date"] == datetime.datetime(2017, 1, 1, 0, 1)
42
43
44def test_to_timestamp_bad_format(spark):
45    raw_df = spark.createDataFrame(
46        [("1/1/17 0:01",)],
47        ["date"],
48    )
49
50    with pytest.raises(Py4JError):
51        to_timestamp_df(raw_df, "date", "M/d/yy HH:mm").collect()

For unit testing, we need to install the Pytest package and export the PYTHONPATH variable that can be found in the .env file. Note, as testing can be run with a local Spark session, the testing package can only be installed in the master node. Below shows an example test run output.

Summary

In this post, we discussed how to set up a remote development environment on an EMR cluster. A cluster is deployed in a private subnet, access from a developer machine is established via PC-to-PC VPN and the VS Code Remote - SSH extension is used to perform remote development. Aside from the default hadoop user, an additional user account is created to show how to share the cluster with multiple users and spark development examples are illustrated with those user accounts. Overall the remote development brings another effective option to develop spark applications on EMR, which improves developer experience significantly.