Unlike traditional Data Lake, new table formats (Iceberg, Hudi and Delta Lake) support features that can be used to apply data warehousing patterns, which can bring a way to be rescued from Data Swamp. In this post, we’ll discuss how to implement ETL using retail analytics data. It has two dimension data (user and product) and a single fact data (order). The dimension data sets have different ETL strategies depending on whether to track historical changes. For the fact data, the primary keys of the dimension data are added to facilitate later queries. We’ll use Iceberg for data storage/management and Spark for data processing. Instead of provisioning an EMR cluster, a local development environment will be used. Finally, the ETL results will be queried by Athena for verification.

EMR Local Environment

In one of my earlier posts, we discussed how to develop and test Apache Spark apps for EMR locally using Docker (and/or VSCode). Instead of provisioning an EMR cluster, we can quickly build an ETL app using the local environment. For this post, a new local environment is created based on the Docker image of the latest EMR 6.6.0 release. Check the GitHub repository for this post for further details.

Apache Iceberg is supported by EMR 6.5.0 or later, and it requires iceberg-defaults configuration classification that enables Iceberg. The latest EMR Docker release (emr-6.6.0-20220411), however, doesn’t support that configuration classification and I didn’t find the _iceberg _folder (/usr/share/aws/iceberg) within the Docker container. Therefore, the project’s AWS integration example is used instead and the following script (run.sh) is an update of the example script that allows to launch the Pyspark shell or to submit a Spark application.

 1# run.sh
 2#!/usr/bin/env bash
 4# add Iceberg dependency
 8# add AWS dependency
12    "bundle"
13    "url-connection-client"
15for pkg in "${AWS_PACKAGES[@]}"; do
19# execute pyspark or spark-submit
22if [ -z $execution ]; then
23    echo "missing execution type. specify either pyspark or spark-submit"
24    exit 1
27if [ $execution == "pyspark" ]; then
28    pyspark --packages $DEPENDENCIES \
29        --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
30        --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
31        --conf spark.sql.catalog.demo.warehouse=s3://iceberg-etl-demo \
32        --conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
33        --conf spark.sql.catalog.demo.io-impl=org.apache.iceberg.aws.s3.S3FileIO
34elif [ $execution == "spark-submit" ]; then
35    if [ -z $app_path ]; then
36        echo "pyspark application is mandatory"
37        exit 1
38    else
39        spark-submit --packages $DEPENDENCIES \
40            --deploy-mode client \
41            --master local[*] \
42            --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
43            --conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
44            --conf spark.sql.catalog.demo.warehouse=s3://iceberg-etl-demo \
45            --conf spark.sql.catalog.demo.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
46            --conf spark.sql.catalog.demo.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
47            $app_path
48    fi

Here is an example of using the script.

1# launch pyspark shell
2$ ./run.sh pyspark
4# execute spark-submit, requires pyspark application (etl.py) as the second argument
5$ ./run.sh spark-submit path-to-app.py

ETL Strategy

Sample Data

We use the retail analytics sample database from YugaByteDB to get the ETL sample data. Records from the following 3 tables are used to run ETL on its own ETL strategy.

The main focus of the demo ETL application is to show how to track product price changes over time and to apply those changes to the order data. Normally ETL is performed daily, but it’ll be time-consuming to execute daily incremental ETL with the order data because it includes records spanning for 5 calendar years. Moreover, as it is related to the user and product data, splitting the corresponding dimension records will be quite difficult. Instead, I chose to run yearly incremental ETL. I first grouped orders in 4 groups where the first group (year 0) includes orders in 2016 and 2017. And each of the remaining groups (year 1 to 3) keeps records of a whole year from 2018 to 2020. Then I created 4 product groups in order to match the order groups and to execute incremental ETL together with the order data. The first group (year 0) keeps the original data and the product price is set to be increased by 5% in the following years until the last group (year 3). Note, with this setup, it is expected that orders for a given product tend to be mapped to a higher product price over time. On the other hand, the ETL strategy of the user data is not to track historical data so that it is used as it is. The sample data files used for the ETL app are listed below, and they can be found in the data folder of the GitHub repository.

 1$ tree data
 3├── orders_year_0.csv
 4├── orders_year_1.csv
 5├── orders_year_2.csv
 6├── orders_year_3.csv
 7├── products_year_0.csv
 8├── products_year_1.csv
 9├── products_year_2.csv
10├── products_year_3.csv
11└── users.csv


Slowly changing dimension (SCD) type 1 is implemented for the user data. This method basically _upsert_s records by comparing the primary key values and therefore doesn’t track historical data. The data has the natural key of _id _and its md5 hash is used as the surrogate key named user_sk - this column is used as the primary key of the table. The table is configured to be partitioned by its surrogate key in 20 buckets. The table creation statement can be found below.

 1CREATE TABLE demo.dwh.users (
 2	user_sk     string,
 3	id          bigint,
 4	name        string,
 5	email       string,
 6	address     string,
 7	city        string,
 8	state       string,
 9	zip         string,
10	birth_date  date,
11	source      string,
12	created_at  timestamp)
13USING iceberg
14PARTITIONED BY (bucket(20, user_sk))


Slowly changing dimension (SCD) type 2 is taken for product data. This method tracks historical data by adding multiple records for a given natural key. Same as the user data, the id column is the natural key. Each record for the same natural key will be given a different surrogate key and the md5 hash of a combination of the id and _created_at _columns is used as the surrogate key named prod_sk. Each record has its own effect period and it is determined by the eff_from and _eff_to _columns and the latest record is marked as 1 for its curr_flag value. The table is also configured to be partitioned by its surrogate key in 20 buckets. The table creation statement is shown below.

 1CREATE TABLE demo.dwh.products (
 2	prod_sk     string,
 3	id          bigint,
 4	category    string,
 5	price       decimal(6,3),
 6	title       string,
 7	vendor      string,
 8	curr_flag   int,
 9	eff_from    timestamp,
10	eff_to      timestamp,
11	created_at  timestamp)
12USING iceberg
13PARTITIONED BY (bucket(20, prod_sk))


The orders table has a composite primary key of the surrogate keys of the dimension tables - users_sk and prod_sk. Those columns don’t exist in the source data and are added during transformation. The table is configured to be partitioned by the date part of the created_at column. The table creation statement can be found below.

1CREATE TABLE demo.dwh.orders (
2	user_sk     string,
3	prod_sk     string,
4	id          bigint,
5	discount    decimal(4,2),
6	quantity    integer,
7	created_at  timestamp)
8USING iceberg
9PARTITIONED BY (days(created_at))

ETL Implementation


In the transformation phase, a source dataframe is created by creating the surrogate key (user_sk), changing data types of relevant columns and selecting columns in the same order as the table is created. Then a view (users_tbl) is created from the source dataframe and it is used to execute MERGE operation by comparing the surrogate key values of the source view with those of the target users table.

 1# src.py
 2def etl_users(file_path: str, spark_session: SparkSession):
 3    print("users - transform records...")
 4    src_df = (
 5        spark_session.read.option("header", "true")
 6        .csv(file_path)
 7        .withColumn("user_sk", md5("id"))
 8        .withColumn("id", expr("CAST(id AS bigint)"))
 9        .withColumn("created_at", to_timestamp("created_at"))
10        .withColumn("birth_date", to_date("birth_date"))
11        .select(
12            "user_sk",
13            "id",
14            "name",
15            "email",
16            "address",
17            "city",
18            "state",
19            "zip",
20            "birth_date",
21            "source",
22            "created_at",
23        )
24    )
25    print("users - upsert records...")
26    src_df.createOrReplaceTempView("users_tbl")
27    spark_session.sql(
28        """
29        MERGE INTO demo.dwh.users t
30        USING (SELECT * FROM users_tbl ORDER BY user_sk) s
31        ON s.user_sk = t.user_sk
34        """
35    )


The source dataframe is created by adding the surrogate key while concatenating the id and _created_at _columns, followed by changing data types of relevant columns and selecting columns in the same order as the table is created. The view (products_tbl) that is created from the source dataframe is used to query all the records that have the product ids in the source table - see products_to_update. Note we need data from the products table in order to update eff_from, _eff_to _and _current_flag _column values. Then eff_lead is added to the result set, which is the next record’s created_at value for a given product id - see products_updated. The final result set is created by determining the _curr_flag _and _eff_to _column value. Note that the _eff_to _value of the last record for a product is set to ‘9999-12-31 00:00:00’ in order to make it easy to query the relevant records. The updated records are updated/inserted by executing MERGE operation by comparing the surrogate key values to those of the target products table

 1# src.py
 2def etl_products(file_path: str, spark_session: SparkSession):
 3    print("products - transform records...")
 4    src_df = (
 5        spark_session.read.option("header", "true")
 6        .csv(file_path)
 7        .withColumn("prod_sk", md5(concat("id", "created_at")))
 8        .withColumn("id", expr("CAST(id AS bigint)"))
 9        .withColumn("price", expr("CAST(price AS decimal(6,3))"))
10        .withColumn("created_at", to_timestamp("created_at"))
11        .withColumn("curr_flag", expr("CAST(NULL AS int)"))
12        .withColumn("eff_from", col("created_at"))
13        .withColumn("eff_to", expr("CAST(NULL AS timestamp)"))
14        .select(
15            "prod_sk",
16            "id",
17            "category",
18            "price",
19            "title",
20            "vendor",
21            "curr_flag",
22            "eff_from",
23            "eff_to",
24            "created_at",
25        )
26    )
27    print("products - upsert records...")
28    src_df.createOrReplaceTempView("products_tbl")
29    products_update_qry = """
30    WITH products_to_update AS (
31        SELECT l.*
32        FROM demo.dwh.products AS l
33        JOIN products_tbl AS r ON l.id = r.id
34        UNION
35        SELECT *
36        FROM products_tbl
37    ), products_updated AS (
38        SELECT *,
39                LEAD(created_at) OVER (PARTITION BY id ORDER BY created_at) AS eff_lead
40        FROM products_to_update
41    )
42    SELECT prod_sk,
43            id,
44            category,
45            price,
46            title,
47            vendor,
48            (CASE WHEN eff_lead IS NULL THEN 1 ELSE 0 END) AS curr_flag,
49            eff_from,
50            COALESCE(eff_lead, to_timestamp('9999-12-31 00:00:00')) AS eff_to,
51            created_at
52    FROM products_updated
53    ORDER BY prod_sk
54    """
55    spark_session.sql(
56        f"""
57        MERGE INTO demo.dwh.products t
58        USING ({products_update_qry}) s
59        ON s.prod_sk = t.prod_sk
62        """
63    )


After transformation, a view (orders_tbl) is created from the source dataframe. The relevant user (user_sk) and product (prod_sk) surrogate keys are added to source data by joining the users and products dimension tables. The users table is SCD type 1 so matching the _user_id _alone is enough for the join condition. On the other hand, additional join condition based on the _eff_from _and _eff_to _columns is necessary for the products table as it is SCD type 2 and records in that table have their own effective periods. Note that ideally we should be able to apply INNER JOIN but the sample data is not clean and some product records are not matched by that operation. For example, an order whose id is 15 is made at 2018-06-26 02:24:38 with a product whose id is 116. However the earliest record of that product is created at 2018-09-12 15:23:05 and it’ll be missed by INNER JOIN. Therefore LEFT JOIN is applied to create the initial result set (orders_updated) and, for those products that are not matched, the surrogate keys of the earliest records are added instead. Finally the updated order records are appended using the DataFrameWriterV2 API.

 1# src.py
 2def etl_orders(file_path: str, spark_session: SparkSession):
 3    print("orders - transform records...")
 4    src_df = (
 5        spark_session.read.option("header", "true")
 6        .csv(file_path)
 7        .withColumn("id", expr("CAST(id AS bigint)"))
 8        .withColumn("user_id", expr("CAST(user_id AS bigint)"))
 9        .withColumn("product_id", expr("CAST(product_id AS bigint)"))
10        .withColumn("discount", expr("CAST(discount AS decimal(4,2))"))
11        .withColumn("quantity", expr("CAST(quantity AS int)"))
12        .withColumn("created_at", to_timestamp("created_at"))
13    )
14    print("orders - append records...")
15    src_df.createOrReplaceTempView("orders_tbl")
16    spark_session.sql(
17        """
18        WITH src_products AS (
19            SELECT * FROM demo.dwh.products
20        ), orders_updated AS (
21            SELECT o.*, u.user_sk, p.prod_sk
22            FROM orders_tbl o
23            LEFT JOIN demo.dwh.users u
24                ON o.user_id = u.id
25            LEFT JOIN src_products p
26                ON o.product_id = p.id
27                AND o.created_at >= p.eff_from
28                AND o.created_at < p.eff_to
29        ), products_tbl AS (
30            SELECT prod_sk,
31                   id,
32                   ROW_NUMBER() OVER (PARTITION BY id ORDER BY eff_from) AS rn
33            FROM src_products
34        )
35        SELECT o.user_sk,
36               COALESCE(o.prod_sk, p.prod_sk) AS prod_sk,
37               o.id,
38               o.discount,
39               o.quantity,
40               o.created_at
41        FROM orders_updated AS o
42        JOIN products_tbl AS p ON o.product_id = p.id
43        WHERE p.rn = 1
44        ORDER BY o.created_at
45        """
46    ).writeTo("demo.dwh.orders").append()


The ETL script begins with creating all the tables - users, products and orders. Then the ETL for the users table is executed. Note that, although it is executed as initial loading, the code can also be applied to incremental ETL. Finally, incremental ETL is executed for the products and orders tables. The application can be submitted by ./run.sh spark-submit etl.py. Note to create the following environment variables before submitting the application.

    • Note it is optional and required if authentication is made via assume role
 1# etl.py
 2from pyspark.sql import SparkSession
 3from src import create_tables, etl_users, etl_products, etl_orders
 5spark = SparkSession.builder.appName("Iceberg ETL Demo").getOrCreate()
 7## create all tables - demo.dwh.users, demo.dwh.products and demo.dwh.orders
10## users etl - assuming SCD type 1
11etl_users("./data/users.csv", spark)
13## incremental ETL
14for yr in range(0, 4):
15    print(f"processing year {yr}")
16    ## products etl - assuming SCD type 2
17    etl_products(f"./data/products_year_{yr}.csv", spark)
18    ## orders etl - relevant user_sk and prod_sk are added during transformation
19    etl_orders(f"./data/orders_year_{yr}.csv", spark)

Once the application completes, we’re able to query the iceberg tables on Athena. The following query returns all products whose id is 1. It is shown that the price increases over time and the relevant columns (curr_flag, eff_from and eff_to) for SCD type 2 are created as expected.

2FROM dwh.products
3WHERE id = 1
4ORDER BY eff_from

The following query returns sample order records that bought the product. It can be checked that the product surrogate key matches the products dimension records.

 1WITH src_orders AS (
 2    SELECT o.user_sk, o.prod_sk, o.id, p.title, p.price, o.discount, o.quantity, o.created_at,
 3           ROW_NUMBER() OVER (PARTITION BY p.price ORDER BY o.created_at) AS rn
 4    FROM dwh.orders AS o
 5    JOIN dwh.products AS p ON o.prod_sk = p.prod_sk
 6    WHERE p.id = 1
 9FROM src_orders
10WHERE rn = 1
11ORDER BY created_at


In this post, we discussed how to implement ETL using retail analytics data. In transformation, SCD type 1 and SCD type 2 are applied to the user and product data respectively. For the order data, the corresponding surrogate keys of the user and product data are added. A Pyspark application that implements ETL against Iceberg tables is used for demonstration in an EMR location environment. Finally, the ETL results will be queried by Athena for verification.