In Part 5, we developed a dbt project that that targets Apache Iceberg where transformations are performed on Amazon Athena. Two dimension tables that keep product and user records are created as Type 2 slowly changing dimension (SCD Type 2) tables, and one transactional fact table is built to keep pizza orders. To improve query performance, the fact table is denormalized to pre-join records from the dimension tables using the array and struct data types. In this post, we discuss how to set up an ETL process on the project using Apache Airflow.
- Part 1 Modelling on PostgreSQL
- Part 2 ETL on PostgreSQL via Airflow
- Part 3 Modelling on BigQuery
- Part 4 ETL on BigQuery via Airflow
- Part 5 Modelling on Amazon Athena
- Part 6 ETL on Amazon Athena via Airflow (this post)
Infrastructure
Apache Airflow and Amazon Athena are used in this post, and the former is deployed locally using Docker Compose. The source can be found in the GitHub repository of this post.
Airflow
Airflow is simplified by using the Local Executor where both scheduling and task execution are handled by the airflow scheduler service - i.e. AIRFLOW__CORE__EXECUTOR: LocalExecutor. Also, it is configured to be able to run the dbt project (see Part 5 for details) within the scheduler service by
- installing the dbt-athena-community and awswrangler packages as additional pip packages,
- volume-mapping folders that keep the dbt project and dbt project profile, and
- specifying environment variables for accessing AWS services (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY)
1# docker-compose.yml
2version: "3"
3x-airflow-common: &airflow-common
4 image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.0}
5 networks:
6 - appnet
7 environment: &airflow-common-env
8 AIRFLOW__CORE__EXECUTOR: LocalExecutor
9 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
10 # For backward compatibility, with Airflow <2.3
11 AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
12 AIRFLOW__CORE__FERNET_KEY: ""
13 AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
14 AIRFLOW__CORE__LOAD_EXAMPLES: "false"
15 AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth"
16 _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- dbt-athena-community==1.7.1 awswrangler pendulum}
17 AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
18 AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
19 TZ: Australia/Sydney
20 volumes:
21 - ./airflow/dags:/opt/airflow/dags
22 - ./airflow/plugins:/opt/airflow/plugins
23 - ./airflow/logs:/opt/airflow/logs
24 - ./pizza_shop:/tmp/pizza_shop # dbt project
25 - ./airflow/dbt-profiles:/opt/airflow/dbt-profiles # dbt profiles
26 user: "${AIRFLOW_UID:-50000}:0"
27 depends_on: &airflow-common-depends-on
28 postgres:
29 condition: service_healthy
30
31services:
32 postgres:
33 image: postgres:13
34 container_name: postgres
35 ports:
36 - 5432:5432
37 networks:
38 - appnet
39 environment:
40 POSTGRES_USER: airflow
41 POSTGRES_PASSWORD: airflow
42 POSTGRES_DB: airflow
43 TZ: Australia/Sydney
44 volumes:
45 - postgres_data:/var/lib/postgresql/data
46 healthcheck:
47 test: ["CMD", "pg_isready", "-U", "airflow"]
48 interval: 5s
49 retries: 5
50
51 airflow-webserver:
52 <<: *airflow-common
53 container_name: webserver
54 command: webserver
55 ports:
56 - 8080:8080
57 depends_on:
58 <<: *airflow-common-depends-on
59 airflow-init:
60 condition: service_completed_successfully
61
62 airflow-scheduler:
63 <<: *airflow-common
64 command: scheduler
65 container_name: scheduler
66 environment:
67 <<: *airflow-common-env
68 depends_on:
69 <<: *airflow-common-depends-on
70 airflow-init:
71 condition: service_completed_successfully
72
73 airflow-init:
74 <<: *airflow-common
75 container_name: init
76 entrypoint: /bin/bash
77 command:
78 - -c
79 - |
80 mkdir -p /sources/logs /sources/dags /sources/plugins /sources/scheduler
81 chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,scheduler}
82 exec /entrypoint airflow version
83 environment:
84 <<: *airflow-common-env
85 _AIRFLOW_DB_UPGRADE: "true"
86 _AIRFLOW_WWW_USER_CREATE: "true"
87 _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
88 _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
89 _PIP_ADDITIONAL_REQUIREMENTS: ""
90 user: "0:0"
91 volumes:
92 - ./airflow:/sources
93
94volumes:
95 airflow_log_volume:
96 driver: local
97 name: airflow_log_volume
98 postgres_data:
99 driver: local
100 name: postgres_data
101
102networks:
103 appnet:
104 name: app-network
Before we deploy the Airflow services, we need to create staging tables and insert initial records. It can be achieved by executing a Python script (insert_records.py) - see Part 5 for details about the prerequisite step. Then the services can be started using the docker-compose up command. Note that it is recommended to specify the host user’s ID as the AIRFLOW_UID value. Otherwise, Airflow can fail to launch due to insufficient permission to write logs.
1## prerequisite
2## create staging tables and insert records - python setup/insert_records.py
3
4## start airflow services
5$ AIRFLOW_UID=$(id -u) docker-compose up -d
Once started, we can visit the Airflow web server on http://localhost:8080.
ETL Job
A simple ETL job (demo_etl) is created, which updates source records (update_records) followed by running and testing the dbt project. Note that the dbt project and profile are accessible as they are volume-mapped to the Airflow scheduler container.
1# airflow/dags/operators.py
2import pendulum
3from airflow.models.dag import DAG
4from airflow.operators.bash import BashOperator
5from airflow.operators.python import PythonOperator
6
7import update_records
8
9with DAG(
10 dag_id="demo_etl",
11 schedule=None,
12 start_date=pendulum.datetime(2024, 1, 1, tz="Australia/Sydney"),
13 catchup=False,
14 tags=["pizza"],
15):
16 task_records_update = PythonOperator(
17 task_id="update_records", python_callable=update_records.main
18 )
19
20 task_dbt_run = BashOperator(
21 task_id="dbt_run",
22 bash_command="dbt run --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
23 )
24
25 task_dbt_test = BashOperator(
26 task_id="dbt_test",
27 bash_command="dbt test --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
28 )
29
30 task_records_update >> task_dbt_run >> task_dbt_test
Update Records
As dbt takes care of data transformation only, we should create a task that updates source records. As shown below, the task updates as many as a half of records of the dimension tables (products and users) and appends 5,000 order records in a single run.
1# airflow/dags/update_records.py
2import os
3import datetime
4import dataclasses
5import json
6import random
7import string
8
9import boto3
10import pandas as pd
11import awswrangler as wr
12
13
14class QueryHelper:
15 def __init__(self, db_name: str, bucket_name: str):
16 self.db_name = db_name
17 self.bucket_name = bucket_name
18
19 def read_sql_query(self, stmt: str):
20 return wr.athena.read_sql_query(
21 stmt,
22 database=self.db_name,
23 boto3_session=boto3.Session(
24 region_name=os.getenv("AWS_REGION", "ap-southeast-2")
25 ),
26 )
27
28 def load_source(self, df: pd.DataFrame, obj_name: str):
29 if obj_name not in ["users", "products", "orders"]:
30 raise ValueError("object name should be one of users, products, orders")
31 wr.s3.to_parquet(
32 df=df,
33 path=f"s3://{self.bucket_name}/staging/{obj_name}/",
34 dataset=True,
35 database=self.db_name,
36 table=f"staging_{obj_name}",
37 boto3_session=boto3.Session(
38 region_name=os.getenv("AWS_REGION", "ap-southeast-2")
39 ),
40 )
41
42
43def update_products(
44 query_helper: QueryHelper,
45 percent: float = 0.5,
46 created_at: datetime.datetime = datetime.datetime.now(),
47):
48 stmt = """
49 WITH windowed AS (
50 SELECT
51 *,
52 ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
53 FROM pizza_shop.staging_products
54 )
55 SELECT id, name, description, price, category, image
56 FROM windowed
57 WHERE rn = 1;
58 """
59 products = query_helper.read_sql_query(stmt)
60 products.insert(products.shape[1], "created_at", created_at)
61 records = products.sample(n=int(products.shape[0] * percent))
62 records["price"] = records["price"] + 10
63 query_helper.load_source(records, "products")
64 return records
65
66
67def update_users(
68 query_helper: QueryHelper,
69 percent: float = 0.5,
70 created_at: datetime.datetime = datetime.datetime.now(),
71):
72 stmt = """
73 WITH windowed AS (
74 SELECT
75 *,
76 ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
77 FROM pizza_shop.staging_users
78 )
79 SELECT id, first_name, last_name, email, residence, lat, lon
80 FROM windowed
81 WHERE rn = 1;
82 """
83 users = query_helper.read_sql_query(stmt)
84 users.insert(users.shape[1], "created_at", created_at)
85 records = users.sample(n=int(users.shape[0] * percent))
86 records[
87 "email"
88 ] = f"{''.join(random.choices(string.ascii_letters, k=5)).lower()}@email.com"
89 query_helper.load_source(records, "users")
90 return records
91
92
93@dataclasses.dataclass
94class Order:
95 id: int
96 user_id: int
97 items: str
98 created_at: datetime.datetime
99
100 def to_json(self):
101 return dataclasses.asdict(self)
102
103 @classmethod
104 def create(cls, id: int, created_at: datetime.datetime):
105 order_items = [
106 {"product_id": id, "quantity": random.randint(1, 5)}
107 for id in set(random.choices(range(1, 82), k=random.randint(1, 10)))
108 ]
109 return cls(
110 id=id,
111 user_id=random.randint(1, 10000),
112 items=json.dumps([item for item in order_items]),
113 created_at=created_at,
114 )
115
116 @staticmethod
117 def insert(
118 query_helper: QueryHelper,
119 max_id: int,
120 num_orders: int = 5000,
121 created_at: datetime.datetime = datetime.datetime.now(),
122 ):
123 records = []
124 for _ in range(num_orders):
125 records.append(Order.create(max_id + 1, created_at).to_json())
126 max_id += 1
127 query_helper.load_source(pd.DataFrame.from_records(records), "orders")
128 return records
129
130 @staticmethod
131 def get_max_id(query_helper: QueryHelper):
132 stmt = "SELECT max(id) AS mx FROM pizza_shop.staging_orders"
133 df = query_helper.read_sql_query(stmt)
134 return next(iter(df.mx))
135
136
137def main():
138 query_helper = QueryHelper(db_name="pizza_shop", bucket_name="dbt-pizza-shop-demo")
139 created_at = datetime.datetime.now()
140 # update product and user records
141 updated_products = update_products(query_helper, created_at=created_at)
142 print(f"{len(updated_products)} product records updated")
143 updated_users = update_users(query_helper, created_at=created_at)
144 print(f"{len(updated_users)} user records updated")
145 # create order records
146 max_order_id = Order.get_max_id(query_helper)
147 new_orders = Order.insert(query_helper, max_order_id, created_at=created_at)
148 print(f"{len(new_orders)} order records created")
The details of the ETL job can be found on the Airflow web server as shown below.
Run ETL
Below shows example product dimension records after the ETL job is completed. It is shown that the product 2 is updated, and a new surrogate key is assigned to the new record as well as the valid_from and valid_to column values are updated accordingly.
1SELECT product_key, product_id AS id, price, valid_from, valid_to
2FROM pizza_shop.dim_products
3WHERE product_id IN (1, 2)
4ORDER BY product_id, valid_from;
5
6# product_key id price valid_from valid_to
71 b8c187845db8b7e55626659cfbb8aea1 1 335.0 2024-03-01 10:16:36.481000 2199-12-31 00:00:00.000000
82 * 8311b52111a924582c0fe5cb566cfa9a 2 60.0 2024-03-01 10:16:36.481000 2024-03-01 10:16:50.932000
93 * 0f4df52917ddff1bcf618b798c8aff43 2 70.0 2024-03-01 10:16:50.932000 2199-12-31 00:00:00.000000
When we query the fact table for orders with the same product ID, we can check correct product key values are mapped - note value_to is not inclusive.
1SELECT o.order_id, p.key, p.id, p.price, p.quantity, o.created_at
2FROM pizza_shop.fct_orders AS o
3CROSS JOIN UNNEST(product) AS t(p)
4WHERE o.order_id IN (11146, 20398) AND p.id IN (1, 2)
5ORDER BY o.order_id, p.id;
6
7# order_id key id price quantity created_at
81 11146 b8c187845db8b7e55626659cfbb8aea1 1 335.0 1 2024-03-01 10:16:37.981000
92 11146 * 8311b52111a924582c0fe5cb566cfa9a 2 60.0 2 2024-03-01 10:16:37.981000
103 20398 b8c187845db8b7e55626659cfbb8aea1 1 335.0 5 2024-03-01 10:16:50.932000
114 20398 * 0f4df52917ddff1bcf618b798c8aff43 2 70.0 3 2024-03-01 10:16:50.932000
Summary
In this series of posts, we discuss data warehouse/lakehouse examples using data build tool (dbt) including ETL orchestration with Apache Airflow. In this post, we discussed how to set up an ETL process on the dbt project developed in Part 5 using Airflow. A demo ETL job was created that updates records followed by running and testing the dbt project. Finally, the result of ETL job was validated by checking sample records.
Comments