In Part 3, we developed a dbt project that targets Google BigQuery with fictional pizza shop data. Two dimension tables that keep product and user records are created as Type 2 slowly changing dimension (SCD Type 2) tables, and one transactional fact table is built to keep pizza orders. The fact table is denormalized using nested and repeated fields for improving query performance. In this post, we discuss how to set up an ETL process on the project using Apache Airflow.
- Part 1 Modelling on PostgreSQL
- Part 2 ETL on PostgreSQL via Airflow
- Part 3 Modelling on BigQuery
- Part 4 ETL on BigQuery via Airflow (this post)
- Part 5 Modelling on Amazon Athena
- Part 6 ETL on Amazon Athena via Airflow
Infrastructure
Apache Airflow and Google BigQuery are used in this post, and the former is deployed locally using Docker Compose. The source can be found in the GitHub repository of this post.
Airflow
Airflow is simplified by using the Local Executor where both scheduling and task execution are handled by the airflow scheduler service - i.e. AIRFLOW__CORE__EXECUTOR: LocalExecutor. Also, it is configured to be able to run the dbt project (see Part 3 for details) within the scheduler service by
- installing the dbt-bigquery package as an additional pip package,
- volume-mapping folders that keep the dbt project and dbt project profile, and
- specifying environment variables for the dbt project profile (GCP_PROJECT and SA_KEYFILE)
1# docker-compose.yml
2version: "3"
3x-airflow-common: &airflow-common
4 image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.0}
5 networks:
6 - appnet
7 environment: &airflow-common-env
8 AIRFLOW__CORE__EXECUTOR: LocalExecutor
9 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
10 # For backward compatibility, with Airflow <2.3
11 AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
12 AIRFLOW__CORE__FERNET_KEY: ""
13 AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
14 AIRFLOW__CORE__LOAD_EXAMPLES: "false"
15 AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth"
16 _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- dbt-bigquery==1.7.4 pendulum}
17 SA_KEYFILE: /tmp/sa_key/key.json # service account key file, required for dbt profile and airflow python operator
18 GCP_PROJECT: ${GCP_PROJECT} # GCP project id, required for dbt profile
19 TZ: Australia/Sydney
20 volumes:
21 - ./airflow/dags:/opt/airflow/dags
22 - ./airflow/plugins:/opt/airflow/plugins
23 - ./airflow/logs:/opt/airflow/logs
24 - ./pizza_shop:/tmp/pizza_shop # dbt project
25 - ./sa_key:/tmp/sa_key # service account key file
26 - ./airflow/dbt-profiles:/opt/airflow/dbt-profiles # dbt profiles
27 user: "${AIRFLOW_UID:-50000}:0"
28 depends_on: &airflow-common-depends-on
29 postgres:
30 condition: service_healthy
31
32services:
33 postgres:
34 image: postgres:13
35 container_name: postgres
36 ports:
37 - 5432:5432
38 networks:
39 - appnet
40 environment:
41 POSTGRES_USER: airflow
42 POSTGRES_PASSWORD: airflow
43 POSTGRES_DB: airflow
44 TZ: Australia/Sydney
45 volumes:
46 - postgres_data:/var/lib/postgresql/data
47 healthcheck:
48 test: ["CMD", "pg_isready", "-U", "airflow"]
49 interval: 5s
50 retries: 5
51
52 airflow-webserver:
53 <<: *airflow-common
54 container_name: webserver
55 command: webserver
56 ports:
57 - 8080:8080
58 depends_on:
59 <<: *airflow-common-depends-on
60 airflow-init:
61 condition: service_completed_successfully
62
63 airflow-scheduler:
64 <<: *airflow-common
65 command: scheduler
66 container_name: scheduler
67 environment:
68 <<: *airflow-common-env
69 depends_on:
70 <<: *airflow-common-depends-on
71 airflow-init:
72 condition: service_completed_successfully
73
74 airflow-init:
75 <<: *airflow-common
76 container_name: init
77 entrypoint: /bin/bash
78 command:
79 - -c
80 - |
81 mkdir -p /sources/logs /sources/dags /sources/plugins /sources/scheduler
82 chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,scheduler}
83 exec /entrypoint airflow version
84 environment:
85 <<: *airflow-common-env
86 _AIRFLOW_DB_UPGRADE: "true"
87 _AIRFLOW_WWW_USER_CREATE: "true"
88 _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
89 _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
90 _PIP_ADDITIONAL_REQUIREMENTS: ""
91 user: "0:0"
92 volumes:
93 - ./airflow:/sources
94
95volumes:
96 airflow_log_volume:
97 driver: local
98 name: airflow_log_volume
99 postgres_data:
100 driver: local
101 name: postgres_data
102
103networks:
104 appnet:
105 name: app-network
Before we deploy the Airflow services, we need to create the BigQuery dataset and staging tables, followed by inserting initial records - see Part 3 for details about the prerequisite steps. Then the services can be started using the docker-compose up command. Note that it is recommended to specify the host user’s ID as the AIRFLOW_UID value. Otherwise, Airflow can fail to launch due to insufficient permission to write logs. Note also that the relevant GCP project ID should be included as it is read in the compose file.
1## prerequisite
2## 1. create BigQuery dataset and staging tables
3## 2. insert initial records i.e. python setup/insert_records.py
4
5## start airflow services
6$ AIRFLOW_UID=$(id -u) GCP_PROJECT=<gcp-project-id> docker-compose up -d
Once started, we can visit the Airflow web server on http://localhost:8080.
ETL Job
A simple ETL job (demo_etl) is created, which updates source records (update_records) followed by running and testing the dbt project. Note that the dbt project and profile are accessible as they are volume-mapped to the Airflow scheduler container.
1# airflow/dags/operators.py
2import pendulum
3from airflow.models.dag import DAG
4from airflow.operators.bash import BashOperator
5from airflow.operators.python import PythonOperator
6
7import update_records
8
9with DAG(
10 dag_id="demo_etl",
11 schedule=None,
12 start_date=pendulum.datetime(2024, 1, 1, tz="Australia/Sydney"),
13 catchup=False,
14 tags=["pizza"],
15):
16 task_records_update = PythonOperator(
17 task_id="update_records", python_callable=update_records.main
18 )
19
20 task_dbt_run = BashOperator(
21 task_id="dbt_run",
22 bash_command="dbt run --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
23 )
24
25 task_dbt_test = BashOperator(
26 task_id="dbt_test",
27 bash_command="dbt test --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
28 )
29
30 task_records_update >> task_dbt_run >> task_dbt_test
Update Records
As dbt takes care of data transformation only, we should create a task that updates source records. As shown below, the task updates as many as a half of records of the dimension tables (products and users) and appends 5,000 order records in a single run.
1# airflow/dags/update_records.py
2import os
3import datetime
4import dataclasses
5import json
6import random
7import string
8
9from google.cloud import bigquery
10from google.oauth2 import service_account
11
12
13class QueryHelper:
14 def __init__(self, sa_keyfile: str):
15 self.sa_keyfile = sa_keyfile
16 self.credentials = self.get_credentials()
17 self.client = bigquery.Client(
18 credentials=self.credentials, project=self.credentials.project_id
19 )
20
21 def get_credentials(self):
22 # https://cloud.google.com/bigquery/docs/samples/bigquery-client-json-credentials#bigquery_client_json_credentials-python
23 return service_account.Credentials.from_service_account_file(
24 self.sa_keyfile, scopes=["https://www.googleapis.com/auth/cloud-platform"]
25 )
26
27 def get_table(self, dataset_name: str, table_name: str):
28 return self.client.get_table(
29 f"{self.credentials.project_id}.{dataset_name}.{table_name}"
30 )
31
32 def fetch_rows(self, stmt: str):
33 query_job = self.client.query(stmt)
34 rows = query_job.result()
35 return rows
36
37 def insert_rows(self, dataset_name: str, table_name: str, records: list):
38 table = self.get_table(dataset_name, table_name)
39 errors = self.client.insert_rows_json(table, records)
40 if len(errors) > 0:
41 print(errors)
42 raise RuntimeError("fails to insert records")
43
44
45@dataclasses.dataclass
46class Product:
47 id: int
48 name: int
49 description: int
50 price: float
51 category: str
52 image: str
53 created_at: str = datetime.datetime.now().isoformat(timespec="seconds")
54
55 def __hash__(self) -> int:
56 return self.id
57
58 def to_json(self):
59 return dataclasses.asdict(self)
60
61 @classmethod
62 def from_row(cls, row: bigquery.Row):
63 return cls(**dict(row.items()))
64
65 @staticmethod
66 def fetch(query_helper: QueryHelper):
67 stmt = """
68 WITH windowed AS (
69 SELECT
70 *,
71 ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
72 FROM `pizza_shop.staging_products`
73 )
74 SELECT id, name, description, price, category, image
75 FROM windowed
76 WHERE rn = 1;
77 """
78 return [Product.from_row(row) for row in query_helper.fetch_rows(stmt)]
79
80 @staticmethod
81 def insert(query_helper: QueryHelper, percent: float = 0.5):
82 products = Product.fetch(query_helper)
83 records = set(random.choices(products, k=int(len(products) * percent)))
84 for r in records:
85 r.price = r.price + 10
86 query_helper.insert_rows(
87 "pizza_shop",
88 "staging_products",
89 [r.to_json() for r in records],
90 )
91 return records
92
93
94@dataclasses.dataclass
95class User:
96 id: int
97 first_name: str
98 last_name: str
99 email: str
100 residence: str
101 lat: float
102 lon: float
103 created_at: str = datetime.datetime.now().isoformat(timespec="seconds")
104
105 def __hash__(self) -> int:
106 return self.id
107
108 def to_json(self):
109 return {
110 k: v if k not in ["lat", "lon"] else str(v)
111 for k, v in dataclasses.asdict(self).items()
112 }
113
114 @classmethod
115 def from_row(cls, row: bigquery.Row):
116 return cls(**dict(row.items()))
117
118 @staticmethod
119 def fetch(query_helper: QueryHelper):
120 stmt = """
121 WITH windowed AS (
122 SELECT
123 *,
124 ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
125 FROM `pizza_shop.staging_users`
126 )
127 SELECT id, first_name, last_name, email, residence, lat, lon
128 FROM windowed
129 WHERE rn = 1;
130 """
131 return [User.from_row(row) for row in query_helper.fetch_rows(stmt)]
132
133 @staticmethod
134 def insert(query_helper: QueryHelper, percent: float = 0.5):
135 users = User.fetch(query_helper)
136 records = set(random.choices(users, k=int(len(users) * percent)))
137 for r in records:
138 r.email = f"{''.join(random.choices(string.ascii_letters, k=5)).lower()}@email.com"
139 query_helper.insert_rows(
140 "pizza_shop",
141 "staging_users",
142 [r.to_json() for r in records],
143 )
144 return records
145
146
147@dataclasses.dataclass
148class Order:
149 id: int
150 user_id: int
151 items: str
152 created_at: str = datetime.datetime.now().isoformat(timespec="seconds")
153
154 def to_json(self):
155 return dataclasses.asdict(self)
156
157 @classmethod
158 def create(cls, id: int):
159 order_items = [
160 {"product_id": id, "quantity": random.randint(1, 5)}
161 for id in set(random.choices(range(1, 82), k=random.randint(1, 10)))
162 ]
163 return cls(
164 id=id,
165 user_id=random.randint(1, 10000),
166 items=json.dumps([item for item in order_items]),
167 )
168
169 @staticmethod
170 def insert(query_helper: QueryHelper, max_id: int, num_orders: int = 5000):
171 records = []
172 for _ in range(num_orders):
173 records.append(Order.create(max_id + 1))
174 max_id += 1
175 query_helper.insert_rows(
176 "pizza_shop",
177 "staging_orders",
178 [r.to_json() for r in records],
179 )
180 return records
181
182 @staticmethod
183 def get_max_id(query_helper: QueryHelper):
184 stmt = "SELECT max(id) AS max FROM `pizza_shop.staging_orders`"
185 return next(iter(query_helper.fetch_rows(stmt))).max
186
187
188def main():
189 query_helper = QueryHelper(os.environ["SA_KEYFILE"])
190 ## update product and user records
191 updated_products = Product.insert(query_helper)
192 print(f"{len(updated_products)} product records updated")
193 updated_users = User.insert(query_helper)
194 print(f"{len(updated_users)} user records updated")
195 ## create order records
196 max_order_id = Order.get_max_id(query_helper)
197 new_orders = Order.insert(query_helper, max_order_id)
198 print(f"{len(new_orders)} order records created")
The details of the ETL job can be found on the Airflow web server as shown below.
Run ETL
Below shows example product dimension records after the ETL job is completed. It is shown that the product 1 is updated, and a new surrogate key is assigned to the new record as well as the valid_from and valid_to column values are updated accordingly.
1SELECT product_key, product_id AS id, price, valid_from, valid_to
2FROM `pizza_shop.dim_products`
3WHERE product_id IN (1, 2)
4
5product_key id price valid_from valid_to
68dd51b3981692c787baa9d4335f15345 2 60.0 2024-02-16T22:07:19 2199-12-31T00:00:00
7* a8c5f8c082bcf52a164f2eccf2b493f6 1 335.0 2024-02-16T22:07:19 2024-02-16T22:09:37
8* c995d7e1ec035da116c0f37e6284d1d5 1 345.0 2024-02-16T22:09:37 2199-12-31T00:00:00
When we query the fact table for orders with the same product ID, we can check correct product key values are mapped - note value_to is not inclusive.
1SELECT o.order_id, p.key, p.id, p.price, p.quantity, o.created_at
2FROM `pizza_shop.fct_orders` AS o,
3 UNNEST(o.product) AS p
4WHERE o.order_id IN (11146, 23296) AND p.id IN (1, 2)
5ORDER BY o.order_id, p.id
6
7order_id key id price quantity created_at
8 11146 * a8c5f8c082bcf52a164f2eccf2b493f6 1 335.0 1 2024-02-16T22:07:23
9 11146 8dd51b3981692c787baa9d4335f15345 2 60.0 2 2024-02-16T22:07:23
10 23296 * c995d7e1ec035da116c0f37e6284d1d5 1 345.0 4 2024-02-16T22:09:37
11 23296 8dd51b3981692c787baa9d4335f15345 2 60.0 4 2024-02-16T22:09:37
Summary
In this series of posts, we discuss data warehouse/lakehouse examples using data build tool (dbt) including ETL orchestration with Apache Airflow. In this post, we discussed how to set up an ETL process on the dbt project developed in Part 3 using Airflow. A demo ETL job was created that updates records followed by running and testing the dbt project. Finally, the result of ETL job was validated by checking sample records.
Comments