In Part 5, we developed a dbt project that that targets Apache Iceberg where transformations are performed on Amazon Athena. Two dimension tables that keep product and user records are created as Type 2 slowly changing dimension (SCD Type 2) tables, and one transactional fact table is built to keep pizza orders. To improve query performance, the fact table is denormalized to pre-join records from the dimension tables using the array and struct data types. In this post, we discuss how to set up an ETL process on the project using Apache Airflow.

Infrastructure

Apache Airflow and Amazon Athena are used in this post, and the former is deployed locally using Docker Compose. The source can be found in the GitHub repository of this post.

Airflow

Airflow is simplified by using the Local Executor where both scheduling and task execution are handled by the airflow scheduler service - i.e. AIRFLOW__CORE__EXECUTOR: LocalExecutor. Also, it is configured to be able to run the dbt project (see Part 5 for details) within the scheduler service by

  • installing the dbt-athena-community and awswrangler packages as additional pip packages,
  • volume-mapping folders that keep the dbt project and dbt project profile, and
  • specifying environment variables for accessing AWS services (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY)
  1# docker-compose.yml
  2version: "3"
  3x-airflow-common: &airflow-common
  4  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.0}
  5  networks:
  6    - appnet
  7  environment: &airflow-common-env
  8    AIRFLOW__CORE__EXECUTOR: LocalExecutor
  9    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
 10    # For backward compatibility, with Airflow <2.3
 11    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
 12    AIRFLOW__CORE__FERNET_KEY: ""
 13    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
 14    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
 15    AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth"
 16    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- dbt-athena-community==1.7.1 awswrangler pendulum}
 17    AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
 18    AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
 19    TZ: Australia/Sydney
 20  volumes:
 21    - ./airflow/dags:/opt/airflow/dags
 22    - ./airflow/plugins:/opt/airflow/plugins
 23    - ./airflow/logs:/opt/airflow/logs
 24    - ./pizza_shop:/tmp/pizza_shop                      # dbt project
 25    - ./airflow/dbt-profiles:/opt/airflow/dbt-profiles  # dbt profiles
 26  user: "${AIRFLOW_UID:-50000}:0"
 27  depends_on: &airflow-common-depends-on
 28    postgres:
 29      condition: service_healthy
 30
 31services:
 32  postgres:
 33    image: postgres:13
 34    container_name: postgres
 35    ports:
 36      - 5432:5432
 37    networks:
 38      - appnet
 39    environment:
 40      POSTGRES_USER: airflow
 41      POSTGRES_PASSWORD: airflow
 42      POSTGRES_DB: airflow
 43      TZ: Australia/Sydney
 44    volumes:
 45      - postgres_data:/var/lib/postgresql/data
 46    healthcheck:
 47      test: ["CMD", "pg_isready", "-U", "airflow"]
 48      interval: 5s
 49      retries: 5
 50
 51  airflow-webserver:
 52    <<: *airflow-common
 53    container_name: webserver
 54    command: webserver
 55    ports:
 56      - 8080:8080
 57    depends_on:
 58      <<: *airflow-common-depends-on
 59      airflow-init:
 60        condition: service_completed_successfully
 61
 62  airflow-scheduler:
 63    <<: *airflow-common
 64    command: scheduler
 65    container_name: scheduler
 66    environment:
 67      <<: *airflow-common-env
 68    depends_on:
 69      <<: *airflow-common-depends-on
 70      airflow-init:
 71        condition: service_completed_successfully
 72
 73  airflow-init:
 74    <<: *airflow-common
 75    container_name: init
 76    entrypoint: /bin/bash
 77    command:
 78      - -c
 79      - |
 80        mkdir -p /sources/logs /sources/dags /sources/plugins /sources/scheduler
 81        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,scheduler}
 82        exec /entrypoint airflow version        
 83    environment:
 84      <<: *airflow-common-env
 85      _AIRFLOW_DB_UPGRADE: "true"
 86      _AIRFLOW_WWW_USER_CREATE: "true"
 87      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
 88      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
 89      _PIP_ADDITIONAL_REQUIREMENTS: ""
 90    user: "0:0"
 91    volumes:
 92      - ./airflow:/sources
 93
 94volumes:
 95  airflow_log_volume:
 96    driver: local
 97    name: airflow_log_volume
 98  postgres_data:
 99    driver: local
100    name: postgres_data
101
102networks:
103  appnet:
104    name: app-network

Before we deploy the Airflow services, we need to create staging tables and insert initial records. It can be achieved by executing a Python script (insert_records.py) - see Part 5 for details about the prerequisite step. Then the services can be started using the docker-compose up command. Note that it is recommended to specify the host user’s ID as the AIRFLOW_UID value. Otherwise, Airflow can fail to launch due to insufficient permission to write logs.

1## prerequisite
2## create staging tables and insert records - python setup/insert_records.py 
3
4## start airflow services
5$ AIRFLOW_UID=$(id -u) docker-compose up -d

Once started, we can visit the Airflow web server on http://localhost:8080.

ETL Job

A simple ETL job (demo_etl) is created, which updates source records (update_records) followed by running and testing the dbt project. Note that the dbt project and profile are accessible as they are volume-mapped to the Airflow scheduler container.

 1# airflow/dags/operators.py
 2import pendulum
 3from airflow.models.dag import DAG
 4from airflow.operators.bash import BashOperator
 5from airflow.operators.python import PythonOperator
 6
 7import update_records
 8
 9with DAG(
10    dag_id="demo_etl",
11    schedule=None,
12    start_date=pendulum.datetime(2024, 1, 1, tz="Australia/Sydney"),
13    catchup=False,
14    tags=["pizza"],
15):
16    task_records_update = PythonOperator(
17        task_id="update_records", python_callable=update_records.main
18    )
19
20    task_dbt_run = BashOperator(
21        task_id="dbt_run",
22        bash_command="dbt run --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
23    )
24
25    task_dbt_test = BashOperator(
26        task_id="dbt_test",
27        bash_command="dbt test --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
28    )
29
30    task_records_update >> task_dbt_run >> task_dbt_test

Update Records

As dbt takes care of data transformation only, we should create a task that updates source records. As shown below, the task updates as many as a half of records of the dimension tables (products and users) and appends 5,000 order records in a single run.

  1# airflow/dags/update_records.py
  2import os
  3import datetime
  4import dataclasses
  5import json
  6import random
  7import string
  8
  9import boto3
 10import pandas as pd
 11import awswrangler as wr
 12
 13
 14class QueryHelper:
 15    def __init__(self, db_name: str, bucket_name: str):
 16        self.db_name = db_name
 17        self.bucket_name = bucket_name
 18
 19    def read_sql_query(self, stmt: str):
 20        return wr.athena.read_sql_query(
 21            stmt,
 22            database=self.db_name,
 23            boto3_session=boto3.Session(
 24                region_name=os.getenv("AWS_REGION", "ap-southeast-2")
 25            ),
 26        )
 27
 28    def load_source(self, df: pd.DataFrame, obj_name: str):
 29        if obj_name not in ["users", "products", "orders"]:
 30            raise ValueError("object name should be one of users, products, orders")
 31        wr.s3.to_parquet(
 32            df=df,
 33            path=f"s3://{self.bucket_name}/staging/{obj_name}/",
 34            dataset=True,
 35            database=self.db_name,
 36            table=f"staging_{obj_name}",
 37            boto3_session=boto3.Session(
 38                region_name=os.getenv("AWS_REGION", "ap-southeast-2")
 39            ),
 40        )
 41
 42
 43def update_products(
 44    query_helper: QueryHelper,
 45    percent: float = 0.5,
 46    created_at: datetime.datetime = datetime.datetime.now(),
 47):
 48    stmt = """
 49    WITH windowed AS (
 50        SELECT
 51            *,
 52            ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
 53        FROM pizza_shop.staging_products
 54    )
 55    SELECT id, name, description, price, category, image
 56    FROM windowed
 57    WHERE rn = 1;
 58    """
 59    products = query_helper.read_sql_query(stmt)
 60    products.insert(products.shape[1], "created_at", created_at)
 61    records = products.sample(n=int(products.shape[0] * percent))
 62    records["price"] = records["price"] + 10
 63    query_helper.load_source(records, "products")
 64    return records
 65
 66
 67def update_users(
 68    query_helper: QueryHelper,
 69    percent: float = 0.5,
 70    created_at: datetime.datetime = datetime.datetime.now(),
 71):
 72    stmt = """
 73    WITH windowed AS (
 74        SELECT
 75            *,
 76            ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
 77        FROM pizza_shop.staging_users
 78    )
 79    SELECT id, first_name, last_name, email, residence, lat, lon
 80    FROM windowed
 81    WHERE rn = 1;
 82    """
 83    users = query_helper.read_sql_query(stmt)
 84    users.insert(users.shape[1], "created_at", created_at)
 85    records = users.sample(n=int(users.shape[0] * percent))
 86    records[
 87        "email"
 88    ] = f"{''.join(random.choices(string.ascii_letters, k=5)).lower()}@email.com"
 89    query_helper.load_source(records, "users")
 90    return records
 91
 92
 93@dataclasses.dataclass
 94class Order:
 95    id: int
 96    user_id: int
 97    items: str
 98    created_at: datetime.datetime
 99
100    def to_json(self):
101        return dataclasses.asdict(self)
102
103    @classmethod
104    def create(cls, id: int, created_at: datetime.datetime):
105        order_items = [
106            {"product_id": id, "quantity": random.randint(1, 5)}
107            for id in set(random.choices(range(1, 82), k=random.randint(1, 10)))
108        ]
109        return cls(
110            id=id,
111            user_id=random.randint(1, 10000),
112            items=json.dumps([item for item in order_items]),
113            created_at=created_at,
114        )
115
116    @staticmethod
117    def insert(
118        query_helper: QueryHelper,
119        max_id: int,
120        num_orders: int = 5000,
121        created_at: datetime.datetime = datetime.datetime.now(),
122    ):
123        records = []
124        for _ in range(num_orders):
125            records.append(Order.create(max_id + 1, created_at).to_json())
126            max_id += 1
127        query_helper.load_source(pd.DataFrame.from_records(records), "orders")
128        return records
129
130    @staticmethod
131    def get_max_id(query_helper: QueryHelper):
132        stmt = "SELECT max(id) AS mx FROM pizza_shop.staging_orders"
133        df = query_helper.read_sql_query(stmt)
134        return next(iter(df.mx))
135
136
137def main():
138    query_helper = QueryHelper(db_name="pizza_shop", bucket_name="dbt-pizza-shop-demo")
139    created_at = datetime.datetime.now()
140    # update product and user records
141    updated_products = update_products(query_helper, created_at=created_at)
142    print(f"{len(updated_products)} product records updated")
143    updated_users = update_users(query_helper, created_at=created_at)
144    print(f"{len(updated_users)} user records updated")
145    # create order records
146    max_order_id = Order.get_max_id(query_helper)
147    new_orders = Order.insert(query_helper, max_order_id, created_at=created_at)
148    print(f"{len(new_orders)} order records created")

The details of the ETL job can be found on the Airflow web server as shown below.

Run ETL

Below shows example product dimension records after the ETL job is completed. It is shown that the product 2 is updated, and a new surrogate key is assigned to the new record as well as the valid_from and valid_to column values are updated accordingly.

1SELECT product_key, product_id AS id, price, valid_from, valid_to 
2FROM pizza_shop.dim_products
3WHERE product_id IN (1, 2)
4ORDER BY product_id, valid_from;
5
6# product_key                        id  price  valid_from                  valid_to
71 b8c187845db8b7e55626659cfbb8aea1    1  335.0  2024-03-01 10:16:36.481000  2199-12-31 00:00:00.000000
82 * 8311b52111a924582c0fe5cb566cfa9a  2   60.0  2024-03-01 10:16:36.481000  2024-03-01 10:16:50.932000
93 * 0f4df52917ddff1bcf618b798c8aff43  2   70.0  2024-03-01 10:16:50.932000  2199-12-31 00:00:00.000000

When we query the fact table for orders with the same product ID, we can check correct product key values are mapped - note value_to is not inclusive.

 1SELECT o.order_id, p.key, p.id, p.price, p.quantity, o.created_at
 2FROM pizza_shop.fct_orders AS o
 3CROSS JOIN UNNEST(product) AS t(p)
 4WHERE o.order_id IN (11146, 20398) AND p.id IN (1, 2)
 5ORDER BY o.order_id, p.id;
 6
 7# order_id  key                                 id  price  quantity  created_at
 81    11146  b8c187845db8b7e55626659cfbb8aea1     1  335.0         1  2024-03-01 10:16:37.981000
 92    11146  * 8311b52111a924582c0fe5cb566cfa9a   2   60.0         2  2024-03-01 10:16:37.981000
103    20398  b8c187845db8b7e55626659cfbb8aea1     1  335.0         5  2024-03-01 10:16:50.932000
114    20398  * 0f4df52917ddff1bcf618b798c8aff43   2   70.0         3  2024-03-01 10:16:50.932000

Summary

In this series of posts, we discuss data warehouse/lakehouse examples using data build tool (dbt) including ETL orchestration with Apache Airflow. In this post, we discussed how to set up an ETL process on the dbt project developed in Part 5 using Airflow. A demo ETL job was created that updates records followed by running and testing the dbt project. Finally, the result of ETL job was validated by checking sample records.