In Part 3, we developed a dbt project that targets Google BigQuery with fictional pizza shop data. Two dimension tables that keep product and user records are created as Type 2 slowly changing dimension (SCD Type 2) tables, and one transactional fact table is built to keep pizza orders. The fact table is denormalized using nested and repeated fields for improving query performance. In this post, we discuss how to set up an ETL process on the project using Apache Airflow.

Infrastructure

Apache Airflow and Google BigQuery are used in this post, and the former is deployed locally using Docker Compose. The source can be found in the GitHub repository of this post.

Airflow

Airflow is simplified by using the Local Executor where both scheduling and task execution are handled by the airflow scheduler service - i.e. AIRFLOW__CORE__EXECUTOR: LocalExecutor. Also, it is configured to be able to run the dbt project (see Part 3 for details) within the scheduler service by

  • installing the dbt-bigquery package as an additional pip package,
  • volume-mapping folders that keep the dbt project and dbt project profile, and
  • specifying environment variables for the dbt project profile (GCP_PROJECT and SA_KEYFILE)
  1# docker-compose.yml
  2version: "3"
  3x-airflow-common: &airflow-common
  4  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.0}
  5  networks:
  6    - appnet
  7  environment: &airflow-common-env
  8    AIRFLOW__CORE__EXECUTOR: LocalExecutor
  9    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
 10    # For backward compatibility, with Airflow <2.3
 11    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
 12    AIRFLOW__CORE__FERNET_KEY: ""
 13    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
 14    AIRFLOW__CORE__LOAD_EXAMPLES: "false"
 15    AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth"
 16    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- dbt-bigquery==1.7.4 pendulum}
 17    SA_KEYFILE: /tmp/sa_key/key.json  # service account key file, required for dbt profile and airflow python operator
 18    GCP_PROJECT: ${GCP_PROJECT}       # GCP project id, required for dbt profile
 19    TZ: Australia/Sydney
 20  volumes:
 21    - ./airflow/dags:/opt/airflow/dags
 22    - ./airflow/plugins:/opt/airflow/plugins
 23    - ./airflow/logs:/opt/airflow/logs
 24    - ./pizza_shop:/tmp/pizza_shop                      # dbt project
 25    - ./sa_key:/tmp/sa_key                              # service account key file
 26    - ./airflow/dbt-profiles:/opt/airflow/dbt-profiles  # dbt profiles
 27  user: "${AIRFLOW_UID:-50000}:0"
 28  depends_on: &airflow-common-depends-on
 29    postgres:
 30      condition: service_healthy
 31
 32services:
 33  postgres:
 34    image: postgres:13
 35    container_name: postgres
 36    ports:
 37      - 5432:5432
 38    networks:
 39      - appnet
 40    environment:
 41      POSTGRES_USER: airflow
 42      POSTGRES_PASSWORD: airflow
 43      POSTGRES_DB: airflow
 44      TZ: Australia/Sydney
 45    volumes:
 46      - postgres_data:/var/lib/postgresql/data
 47    healthcheck:
 48      test: ["CMD", "pg_isready", "-U", "airflow"]
 49      interval: 5s
 50      retries: 5
 51
 52  airflow-webserver:
 53    <<: *airflow-common
 54    container_name: webserver
 55    command: webserver
 56    ports:
 57      - 8080:8080
 58    depends_on:
 59      <<: *airflow-common-depends-on
 60      airflow-init:
 61        condition: service_completed_successfully
 62
 63  airflow-scheduler:
 64    <<: *airflow-common
 65    command: scheduler
 66    container_name: scheduler
 67    environment:
 68      <<: *airflow-common-env
 69    depends_on:
 70      <<: *airflow-common-depends-on
 71      airflow-init:
 72        condition: service_completed_successfully
 73
 74  airflow-init:
 75    <<: *airflow-common
 76    container_name: init
 77    entrypoint: /bin/bash
 78    command:
 79      - -c
 80      - |
 81        mkdir -p /sources/logs /sources/dags /sources/plugins /sources/scheduler
 82        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,scheduler}
 83        exec /entrypoint airflow version        
 84    environment:
 85      <<: *airflow-common-env
 86      _AIRFLOW_DB_UPGRADE: "true"
 87      _AIRFLOW_WWW_USER_CREATE: "true"
 88      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
 89      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
 90      _PIP_ADDITIONAL_REQUIREMENTS: ""
 91    user: "0:0"
 92    volumes:
 93      - ./airflow:/sources
 94
 95volumes:
 96  airflow_log_volume:
 97    driver: local
 98    name: airflow_log_volume
 99  postgres_data:
100    driver: local
101    name: postgres_data
102
103networks:
104  appnet:
105    name: app-network

Before we deploy the Airflow services, we need to create the BigQuery dataset and staging tables, followed by inserting initial records - see Part 3 for details about the prerequisite steps. Then the services can be started using the docker-compose up command. Note that it is recommended to specify the host user’s ID as the AIRFLOW_UID value. Otherwise, Airflow can fail to launch due to insufficient permission to write logs. Note also that the relevant GCP project ID should be included as it is read in the compose file.

1## prerequisite
2## 1. create BigQuery dataset and staging tables
3## 2. insert initial records i.e. python setup/insert_records.py 
4
5## start airflow services
6$ AIRFLOW_UID=$(id -u) GCP_PROJECT=<gcp-project-id> docker-compose up -d

Once started, we can visit the Airflow web server on http://localhost:8080.

ETL Job

A simple ETL job (demo_etl) is created, which updates source records (update_records) followed by running and testing the dbt project. Note that the dbt project and profile are accessible as they are volume-mapped to the Airflow scheduler container.

 1# airflow/dags/operators.py
 2import pendulum
 3from airflow.models.dag import DAG
 4from airflow.operators.bash import BashOperator
 5from airflow.operators.python import PythonOperator
 6
 7import update_records
 8
 9with DAG(
10    dag_id="demo_etl",
11    schedule=None,
12    start_date=pendulum.datetime(2024, 1, 1, tz="Australia/Sydney"),
13    catchup=False,
14    tags=["pizza"],
15):
16    task_records_update = PythonOperator(
17        task_id="update_records", python_callable=update_records.main
18    )
19
20    task_dbt_run = BashOperator(
21        task_id="dbt_run",
22        bash_command="dbt run --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
23    )
24
25    task_dbt_test = BashOperator(
26        task_id="dbt_test",
27        bash_command="dbt test --profiles-dir /opt/airflow/dbt-profiles --project-dir /tmp/pizza_shop",
28    )
29
30    task_records_update >> task_dbt_run >> task_dbt_test

Update Records

As dbt takes care of data transformation only, we should create a task that updates source records. As shown below, the task updates as many as a half of records of the dimension tables (products and users) and appends 5,000 order records in a single run.

  1# airflow/dags/update_records.py
  2import os
  3import datetime
  4import dataclasses
  5import json
  6import random
  7import string
  8
  9from google.cloud import bigquery
 10from google.oauth2 import service_account
 11
 12
 13class QueryHelper:
 14    def __init__(self, sa_keyfile: str):
 15        self.sa_keyfile = sa_keyfile
 16        self.credentials = self.get_credentials()
 17        self.client = bigquery.Client(
 18            credentials=self.credentials, project=self.credentials.project_id
 19        )
 20
 21    def get_credentials(self):
 22        # https://cloud.google.com/bigquery/docs/samples/bigquery-client-json-credentials#bigquery_client_json_credentials-python
 23        return service_account.Credentials.from_service_account_file(
 24            self.sa_keyfile, scopes=["https://www.googleapis.com/auth/cloud-platform"]
 25        )
 26
 27    def get_table(self, dataset_name: str, table_name: str):
 28        return self.client.get_table(
 29            f"{self.credentials.project_id}.{dataset_name}.{table_name}"
 30        )
 31
 32    def fetch_rows(self, stmt: str):
 33        query_job = self.client.query(stmt)
 34        rows = query_job.result()
 35        return rows
 36
 37    def insert_rows(self, dataset_name: str, table_name: str, records: list):
 38        table = self.get_table(dataset_name, table_name)
 39        errors = self.client.insert_rows_json(table, records)
 40        if len(errors) > 0:
 41            print(errors)
 42            raise RuntimeError("fails to insert records")
 43
 44
 45@dataclasses.dataclass
 46class Product:
 47    id: int
 48    name: int
 49    description: int
 50    price: float
 51    category: str
 52    image: str
 53    created_at: str = datetime.datetime.now().isoformat(timespec="seconds")
 54
 55    def __hash__(self) -> int:
 56        return self.id
 57
 58    def to_json(self):
 59        return dataclasses.asdict(self)
 60
 61    @classmethod
 62    def from_row(cls, row: bigquery.Row):
 63        return cls(**dict(row.items()))
 64
 65    @staticmethod
 66    def fetch(query_helper: QueryHelper):
 67        stmt = """
 68        WITH windowed AS (
 69            SELECT
 70                *,
 71                ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
 72            FROM `pizza_shop.staging_products`
 73        )
 74        SELECT id, name, description, price, category, image
 75        FROM windowed
 76        WHERE rn = 1;
 77        """
 78        return [Product.from_row(row) for row in query_helper.fetch_rows(stmt)]
 79
 80    @staticmethod
 81    def insert(query_helper: QueryHelper, percent: float = 0.5):
 82        products = Product.fetch(query_helper)
 83        records = set(random.choices(products, k=int(len(products) * percent)))
 84        for r in records:
 85            r.price = r.price + 10
 86        query_helper.insert_rows(
 87            "pizza_shop",
 88            "staging_products",
 89            [r.to_json() for r in records],
 90        )
 91        return records
 92
 93
 94@dataclasses.dataclass
 95class User:
 96    id: int
 97    first_name: str
 98    last_name: str
 99    email: str
100    residence: str
101    lat: float
102    lon: float
103    created_at: str = datetime.datetime.now().isoformat(timespec="seconds")
104
105    def __hash__(self) -> int:
106        return self.id
107
108    def to_json(self):
109        return {
110            k: v if k not in ["lat", "lon"] else str(v)
111            for k, v in dataclasses.asdict(self).items()
112        }
113
114    @classmethod
115    def from_row(cls, row: bigquery.Row):
116        return cls(**dict(row.items()))
117
118    @staticmethod
119    def fetch(query_helper: QueryHelper):
120        stmt = """
121        WITH windowed AS (
122            SELECT
123                *,
124                ROW_NUMBER() OVER (PARTITION BY id ORDER BY created_at DESC) AS rn
125            FROM `pizza_shop.staging_users`
126        )
127        SELECT id, first_name, last_name, email, residence, lat, lon
128        FROM windowed
129        WHERE rn = 1;
130        """
131        return [User.from_row(row) for row in query_helper.fetch_rows(stmt)]
132
133    @staticmethod
134    def insert(query_helper: QueryHelper, percent: float = 0.5):
135        users = User.fetch(query_helper)
136        records = set(random.choices(users, k=int(len(users) * percent)))
137        for r in records:
138            r.email = f"{''.join(random.choices(string.ascii_letters, k=5)).lower()}@email.com"
139        query_helper.insert_rows(
140            "pizza_shop",
141            "staging_users",
142            [r.to_json() for r in records],
143        )
144        return records
145
146
147@dataclasses.dataclass
148class Order:
149    id: int
150    user_id: int
151    items: str
152    created_at: str = datetime.datetime.now().isoformat(timespec="seconds")
153
154    def to_json(self):
155        return dataclasses.asdict(self)
156
157    @classmethod
158    def create(cls, id: int):
159        order_items = [
160            {"product_id": id, "quantity": random.randint(1, 5)}
161            for id in set(random.choices(range(1, 82), k=random.randint(1, 10)))
162        ]
163        return cls(
164            id=id,
165            user_id=random.randint(1, 10000),
166            items=json.dumps([item for item in order_items]),
167        )
168
169    @staticmethod
170    def insert(query_helper: QueryHelper, max_id: int, num_orders: int = 5000):
171        records = []
172        for _ in range(num_orders):
173            records.append(Order.create(max_id + 1))
174            max_id += 1
175        query_helper.insert_rows(
176            "pizza_shop",
177            "staging_orders",
178            [r.to_json() for r in records],
179        )
180        return records
181
182    @staticmethod
183    def get_max_id(query_helper: QueryHelper):
184        stmt = "SELECT max(id) AS max FROM `pizza_shop.staging_orders`"
185        return next(iter(query_helper.fetch_rows(stmt))).max
186
187
188def main():
189    query_helper = QueryHelper(os.environ["SA_KEYFILE"])
190    ## update product and user records
191    updated_products = Product.insert(query_helper)
192    print(f"{len(updated_products)} product records updated")
193    updated_users = User.insert(query_helper)
194    print(f"{len(updated_users)} user records updated")
195    ## create order records
196    max_order_id = Order.get_max_id(query_helper)
197    new_orders = Order.insert(query_helper, max_order_id)
198    print(f"{len(new_orders)} order records created")

The details of the ETL job can be found on the Airflow web server as shown below.

Run ETL

Below shows example product dimension records after the ETL job is completed. It is shown that the product 1 is updated, and a new surrogate key is assigned to the new record as well as the valid_from and valid_to column values are updated accordingly.

1SELECT product_key, product_id AS id, price, valid_from, valid_to 
2FROM `pizza_shop.dim_products`
3WHERE product_id IN (1, 2)
4
5product_key                         id price valid_from          valid_to
68dd51b3981692c787baa9d4335f15345     2  60.0 2024-02-16T22:07:19 2199-12-31T00:00:00
7* a8c5f8c082bcf52a164f2eccf2b493f6   1 335.0 2024-02-16T22:07:19 2024-02-16T22:09:37
8* c995d7e1ec035da116c0f37e6284d1d5   1 345.0 2024-02-16T22:09:37 2199-12-31T00:00:00

When we query the fact table for orders with the same product ID, we can check correct product key values are mapped - note value_to is not inclusive.

 1SELECT o.order_id, p.key, p.id, p.price, p.quantity, o.created_at
 2FROM `pizza_shop.fct_orders` AS o,
 3  UNNEST(o.product) AS p
 4WHERE o.order_id IN (11146, 23296) AND p.id IN (1, 2)
 5ORDER BY o.order_id, p.id
 6
 7order_id  key                                id price quantity created_at
 8   11146  * a8c5f8c082bcf52a164f2eccf2b493f6  1 335.0        1 2024-02-16T22:07:23
 9   11146  8dd51b3981692c787baa9d4335f15345    2  60.0        2 2024-02-16T22:07:23
10   23296  * c995d7e1ec035da116c0f37e6284d1d5  1 345.0        4 2024-02-16T22:09:37
11   23296  8dd51b3981692c787baa9d4335f15345    2  60.0        4 2024-02-16T22:09:37

Summary

In this series of posts, we discuss data warehouse/lakehouse examples using data build tool (dbt) including ETL orchestration with Apache Airflow. In this post, we discussed how to set up an ETL process on the dbt project developed in Part 3 using Airflow. A demo ETL job was created that updates records followed by running and testing the dbt project. Finally, the result of ETL job was validated by checking sample records.