In this series, we discuss practical examples of data warehouse and lakehouse development where data transformation is performed by the data build tool (dbt) and ETL is managed by Apache Airflow. In Part 1, we developed a dbt project on PostgreSQL using fictional pizza shop data. At the end, the data sets are modelled by two SCD type 2 dimension tables and one transactional fact table. In this post, we create a new dbt project that targets Google BigQuery. While the dimension tables are kept by the same SCD type 2 approach, the fact table is denormalized using nested and repeated fields, which potentially can improve query performance by pre-joining corresponding dimension records.

Setup BigQuery

Google BigQuery is used in the post, and the source can be found in the GitHub repository of this post.

Prepare Data

Fictional pizza shop data from Building Real-Time Analytics Systems is used in this post. There are three data sets - products, users and orders. The first two data sets are copied from the book’s GitHub repository and saved into the setup/data folder. The last order data set is generated by the following Python script.

 1# setup/generate_orders.py
 2import os
 3import csv
 4import dataclasses
 5import random
 6import json
 7
 8
 9@dataclasses.dataclass
10class Order:
11    user_id: int
12    items: str
13
14    @staticmethod
15    def create():
16        order_items = [
17            {"product_id": id, "quantity": random.randint(1, 5)}
18            for id in set(random.choices(range(1, 82), k=random.randint(1, 10)))
19        ]
20        return Order(
21            user_id=random.randint(1, 10000),
22            items=json.dumps([item for item in order_items]),
23        )
24
25
26if __name__ == "__main__":
27    """
28    Generate random orders given by the NUM_ORDERS environment variable.
29        - orders.csv will be written to ./data folder
30    
31    Example:
32        python generate_orders.py
33        NUM_ORDERS=10000 python generate_orders.py
34    """
35    NUM_ORDERS = int(os.getenv("NUM_ORDERS", "20000"))
36    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
37    orders = [Order.create() for _ in range(NUM_ORDERS)]
38
39    filepath = os.path.join(CURRENT_DIR, "data", "orders.csv")
40    if os.path.exists(filepath):
41        os.remove(filepath)
42
43    with open(os.path.join(CURRENT_DIR, "data", "orders.csv"), "w") as f:
44        writer = csv.writer(f)
45        writer.writerow(["user_id", "items"])
46        for order in orders:
47            writer.writerow(dataclasses.asdict(order).values())

Below shows sample order records generated by the script. It includes user ID and order items.

1user_id,items
26845,"[{""product_id"": 52, ""quantity"": 4}, {""product_id"": 68, ""quantity"": 5}]"
36164,"[{""product_id"": 77, ""quantity"": 4}]"
49303,"[{""product_id"": 5, ""quantity"": 2}, {""product_id"": 71, ""quantity"": 3}, {""product_id"": 74, ""quantity"": 2}, {""product_id"": 10, ""quantity"": 5}, {""product_id"": 12, ""quantity"": 2}]"

The folder structure of source data sets and order generation script can be found below.

1$ tree setup/ -P  "*.csv|generate*"
2setup/
3├── data
4│   ├── orders.csv
5│   ├── products.csv
6│   └── users.csv
7└── generate_orders.py

Insert Source Data

The source data sets are inserted into staging tables using a Python script.

Create Stage Tables

A BigQuery dataset named pizza_shop is created, and three staging tables are created as shown below.

 1-- setup/create_stage_tables.sql
 2DROP TABLE IF EXISTS pizza_shop.staging_users;
 3DROP TABLE IF EXISTS pizza_shop.staging_products;
 4DROP TABLE IF EXISTS pizza_shop.staging_orders;
 5
 6CREATE TABLE pizza_shop.staging_users
 7(
 8    id INTEGER,
 9    first_name STRING,
10    last_name STRING,
11    email STRING,
12    residence STRING,
13    lat DECIMAL(10, 8),
14    lon DECIMAL(10, 8),
15    created_at DATETIME
16);
17
18CREATE TABLE pizza_shop.staging_products
19(
20    id INTEGER,
21    name STRING,
22    description STRING,
23    price FLOAT64,
24    category STRING,
25    image STRING,
26    created_at DATETIME
27);
28
29CREATE TABLE pizza_shop.staging_orders
30(
31    id INTEGER,
32    user_id INTEGER,
33    items JSON,
34    created_at DATETIME
35);

Insert Records

The source data is inserted using the Python Client for Google BigQuery. It is a simple process that reads records from the source data files as dictionary while adding incremental ID/creation datetime and inserts them using the client library. Note that a service account is used for authentication and its key file (key.json) is placed in the sa_key folder that exists in the same level of the script’s parent folder - see below for the required folder structure.

 1# setup/insert_records.py
 2import os
 3import csv
 4import datetime
 5
 6from google.cloud import bigquery
 7from google.oauth2 import service_account
 8
 9
10class QueryHelper:
11    def __init__(self, current_dir: str = None):
12        self.current_dir = current_dir or os.path.dirname(os.path.realpath(__file__))
13        self.credentials = self.get_credentials()
14        self.client = bigquery.Client(
15            credentials=self.credentials, project=self.credentials.project_id
16        )
17
18    def get_credentials(self):
19        # https://cloud.google.com/bigquery/docs/samples/bigquery-client-json-credentials#bigquery_client_json_credentials-python
20        sa_key_path = os.path.join(
21            os.path.dirname(self.current_dir), "sa_key", "key.json"
22        )
23        return service_account.Credentials.from_service_account_file(
24            sa_key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"]
25        )
26
27    def get_table(self, dataset_name: str, table_name: str):
28        return self.client.get_table(
29            f"{self.credentials.project_id}.{dataset_name}.{table_name}"
30        )
31
32    def insert_rows(self, dataset_name: str, table_name: str, records: list):
33        table = self.get_table(dataset_name, table_name)
34        errors = self.client.insert_rows_json(table, records)
35        if len(errors) > 0:
36            print(errors)
37            raise RuntimeError("fails to insert records")
38
39
40class DataHelper:
41    def __init__(self, current_dir: str = None):
42        self.current_dir = current_dir or os.path.dirname(os.path.realpath(__file__))
43
44    def load_data(self, file_name: str):
45        created_at = datetime.datetime.now().isoformat(timespec="seconds")
46        records = []
47        with open(os.path.join(self.current_dir, "data", file_name), mode="r") as f:
48            rows = csv.DictReader(f)
49            for ind, row in enumerate(rows):
50                extras = {"id": ind + 1, "created_at": created_at}
51                records.append({**extras, **row})
52        return records
53
54
55if __name__ == "__main__":
56    dataset_name = os.getenv("DATASET_NAME", "pizza_shop")
57
58    query_helper = QueryHelper()
59    data_helper = DataHelper()
60    print("inserting products...")
61    products = data_helper.load_data("products.csv")
62    query_helper.insert_rows(dataset_name, "staging_products", products)
63    print("inserting users...")
64    users = data_helper.load_data("users.csv")
65    query_helper.insert_rows(dataset_name, "staging_users", users)
66    print("inserting orders...")
67    orders = data_helper.load_data("orders.csv")
68    query_helper.insert_rows(dataset_name, "staging_orders", orders)

As mentioned, the key file (key.json) is located in the sa_key folder that exists in the same level of the script’s parent folder. It is kept separately as it can be shared by the dbt project and Airflow scheduler as well.

 1$ tree sa_key/ && tree setup/ -P "insert_records.py|*.csv"
 2sa_key/
 3└── key.json
 4
 5setup/
 6├── data
 7│   ├── orders.csv
 8│   ├── products.csv
 9│   └── users.csv
10└── insert_records.py

Setup DBT Project

A dbt project named pizza_shop is created using the dbt-bigquery package (dbt-bigquery==1.7.4). Specifically, it is created using the dbt init command, and it bootstraps the project in the pizza_shop folder as well as adds the project profile to the dbt profiles file. Note that the service account is used for authentication and the path of its key file is specified in the keyfile attribute. See this page for details about how to set up BigQuery for a dbt project.

 1# $HOME/.dbt/profiles.yml
 2pizza_shop:
 3  outputs:
 4    dev:
 5      type: bigquery
 6      method: service-account
 7      project: <project-id>
 8      dataset: pizza_shop
 9      threads: 4
10      keyfile: <path-to-service-account-key-file>
11      job_execution_timeout_seconds: 300
12      job_retries: 1
13      location: US
14      priority: interactive
15  target: dev

Project Sources

Recall that three staging tables are created earlier, and they are used as sources of the project. Their details are kept in sources.yml to be referred easily in other models.

 1# pizza_shop/models/sources.yml
 2version: 2
 3
 4sources:
 5  - name: raw
 6    schema: pizza_shop
 7    tables:
 8      - name: users
 9        identifier: staging_users
10      - name: products
11        identifier: staging_products
12      - name: orders
13        identifier: staging_orders

Using the raw sources, three models are created by performing simple transformations such as adding surrogate keys using the dbt_utils package and changing column names. Note that, as the products and users dimension tables are kept by Type 2 slowly changing dimension (SCD type 2), the surrogate keys are used to uniquely identify relevant dimension records.

 1-- pizza_shop/models/src/src_products.sql
 2WITH raw_products AS (
 3  SELECT * FROM {{ source('raw', 'products') }}
 4)
 5SELECT
 6  {{ dbt_utils.generate_surrogate_key(['name', 'description', 'price', 'category', 'image']) }} as product_key,
 7  id AS product_id,
 8  name,
 9  description,
10  price,
11  category,
12  image,
13  created_at
14FROM raw_products
 1-- pizza_shop/models/src/src_users.sql
 2WITH raw_users AS (
 3  SELECT * FROM {{ source('raw', 'users') }}
 4)
 5SELECT
 6  {{ dbt_utils.generate_surrogate_key(['first_name', 'last_name', 'email', 'residence', 'lat', 'lon']) }} as user_key,
 7  id AS user_id,
 8  first_name,
 9  last_name,
10  email,
11  residence,
12  lat AS latitude,
13  lon AS longitude,
14  created_at
15FROM raw_users
 1-- pizza_shop/models/src/src_orders.sql
 2WITH raw_orders AS (
 3  SELECT * FROM {{ source('raw', 'orders') }}
 4)
 5SELECT
 6  id AS order_id,
 7  user_id,
 8  items,
 9  created_at
10FROM raw_orders

Data Modelling

For SCD type 2, the dimension tables are materialized as table and two additional columns are included - valid_from and valid_to. The extra columns are for setting up a time range where a record is applicable, and they are used to map a relevant record in the fact table when there are multiple dimension records according to the same natural key. Note that SCD type 2 tables can also be maintained by dbt snapshots.

 1-- pizza_shop/models/dim/dim_products.sql
 2{{
 3  config(
 4    materialized = 'table',
 5    )
 6}}
 7WITH src_products AS (
 8  SELECT * FROM {{ ref('src_products') }}
 9)
10SELECT
11    *, 
12    created_at AS valid_from,
13    COALESCE(
14      LEAD(created_at, 1) OVER (PARTITION BY product_id ORDER BY created_at), 
15      CAST('2199-12-31' AS DATETIME)
16    ) AS valid_to
17FROM src_products
 1-- pizza_shop/models/dim/dim_users.sql
 2{{
 3  config(
 4    materialized = 'table',
 5    )
 6}}
 7WITH src_users AS (
 8  SELECT * FROM {{ ref('src_users') }}
 9)
10SELECT
11    *, 
12    created_at AS valid_from,
13    COALESCE(
14      LEAD(created_at, 1) OVER (PARTITION BY user_id ORDER BY created_at), 
15      CAST('2199-12-31' AS DATETIME)
16    ) AS valid_to
17FROM src_users

The transactional fact table is materialized as incremental so that only new records are appended. Also, it is created as a partitioned table for improving query performance by adding date filter. Finally, the user and order items records are pre-joined from the relevant dimension tables. See below for details about how this fact table is structured in BigQuery.

 1-- pizza_shop/models/fct/fct_orders.sql
 2{{
 3  config(
 4    materialized = 'incremental',
 5    partition_by = {
 6      'field': 'order_date',
 7      'data_type': 'date',
 8      'granularity': 'day',
 9      'time_ingestion_partitioning': true
10    })
11}}
12WITH dim_products AS (
13  SELECT * FROM {{ ref('dim_products') }}
14), dim_users AS (
15  SELECT * FROM {{ ref('dim_users') }}
16), expanded_orders AS (
17  SELECT 
18    order_id,
19    user_id,
20    CAST(JSON_EXTRACT_SCALAR(json , '$.product_id') AS INTEGER) AS product_id,
21    CAST(JSON_EXTRACT_SCALAR(json , '$.quantity') AS INTEGER) AS quantity,
22    created_at
23  FROM {{ ref('src_orders') }} AS t,
24    UNNEST(JSON_EXTRACT_ARRAY(t.items , '$')) AS json
25)
26SELECT
27  o.order_id,
28  ARRAY_AGG(
29    STRUCT(p.product_key AS key, o.product_id AS id, p.name, p.price, o.quantity, p.description, p.category, p.image)
30  ) as product,
31  STRUCT(u.user_key AS key, o.user_id AS id, u.first_name, u.last_name, u.email, u.residence, u.latitude, u.longitude) AS user,
32  o.created_at,
33  EXTRACT(DATE FROM o.created_at) AS order_date
34FROM expanded_orders o
35JOIN dim_products p 
36  ON o.product_id = p.product_id
37    AND o.created_at >= p.valid_from
38    AND o.created_at < p.valid_to
39JOIN dim_users u 
40  ON o.user_id = u.user_id
41    AND o.created_at >= u.valid_from
42    AND o.created_at < u.valid_to
43{% if is_incremental() %}
44  WHERE o.created_at > (SELECT max(created_at) from {{ this }})
45{% endif %}
46GROUP BY 
47  o.order_id, 
48  u.user_key, 
49  o.user_id, 
50  u.first_name, 
51  u.last_name, 
52  u.email, 
53  u.residence, 
54  u.latitude, 
55  u.longitude, 
56  o.created_at

We can keep the final models in a separate YAML file for testing and enhanced documentation.

 1# pizza_shop/models/schema.yml
 2version: 2
 3
 4models:
 5  - name: dim_products
 6    description: Products table, which is converted into SCD type 2
 7    columns:
 8      - name: product_key
 9        description: |
10          Primary key of the table
11          Surrogate key, which is generated by md5 hash using the following columns
12            - name, description, price, category, image          
13        tests:
14          - not_null
15          - unique
16      - name: product_id
17        description: Natural key of products
18      - name: name
19        description: Porduct name
20      - name: description
21        description: Product description
22      - name: price
23        description: Product price
24      - name: category
25        description: Product category
26      - name: image
27        description: Product image
28      - name: created_at
29        description: Timestamp when the record is loaded
30      - name: valid_from
31        description: Effective start timestamp of the corresponding record (inclusive)
32      - name: valid_to
33        description: Effective end timestamp of the corresponding record (exclusive)
34  - name: dim_users
35    description: Users table, which is converted into SCD type 2
36    columns:
37      - name: user_key
38        description: |
39          Primary key of the table
40          Surrogate key, which is generated by md5 hash using the following columns
41            - first_name, last_name, email, residence, lat, lon          
42        tests:
43          - not_null
44          - unique
45      - name: user_id
46        description: Natural key of users
47      - name: first_name
48        description: First name
49      - name: last_name
50        description: Last name
51      - name: email
52        description: Email address
53      - name: residence
54        description: User address
55      - name: latitude
56        description: Latitude of user address
57      - name: longitude
58        description: Longitude of user address
59      - name: created_at
60        description: Timestamp when the record is loaded
61      - name: valid_from
62        description: Effective start timestamp of the corresponding record (inclusive)
63      - name: valid_to
64        description: Effective end timestamp of the corresponding record (exclusive)
65  - name: fct_orders
66    description: Orders fact table. Order items are exploded into rows
67    columns:
68      - name: order_id
69        description: Natural key of orders
70      - name: product
71        description: |
72          Array of products in an order.
73          A product is an array of struct where the following attributes are pre-joined from the dim_products table:
74            key (product_key), id (product_id), name, price, quantity, description, category, and image          
75      - name: user
76        description: |
77          A struct where the following attributes are pre-joined from the dim_users table:
78            key (user_key), id (user_id), first_name, last_name, email, residence, latitude, and longitude          
79      - name: created_at
80        description: Timestamp when the record is loaded

The project can be executed using the dbt run command as shown below.

 1$ dbt run
 223:06:05  Running with dbt=1.7.7
 323:06:05  Registered adapter: bigquery=1.7.4
 423:06:05  Found 6 models, 4 tests, 3 sources, 0 exposures, 0 metrics, 568 macros, 0 groups, 0 semantic models
 523:06:05  
 623:06:07  Concurrency: 4 threads (target='dev')
 723:06:07  
 823:06:07  1 of 6 START sql view model pizza_shop.src_orders .............................. [RUN]
 923:06:07  2 of 6 START sql view model pizza_shop.src_products ............................ [RUN]
1023:06:07  3 of 6 START sql view model pizza_shop.src_users ............................... [RUN]
1123:06:08  1 of 6 OK created sql view model pizza_shop.src_orders ......................... [CREATE VIEW (0 processed) in 1.65s]
1223:06:09  3 of 6 OK created sql view model pizza_shop.src_users .......................... [CREATE VIEW (0 processed) in 1.93s]
1323:06:09  2 of 6 OK created sql view model pizza_shop.src_products ....................... [CREATE VIEW (0 processed) in 1.93s]
1423:06:09  4 of 6 START sql table model pizza_shop.dim_users .............................. [RUN]
1523:06:09  5 of 6 START sql table model pizza_shop.dim_products ........................... [RUN]
1623:06:14  5 of 6 OK created sql table model pizza_shop.dim_products ...................... [CREATE TABLE (81.0 rows, 0 processed) in 5.21s]
1723:06:14  4 of 6 OK created sql table model pizza_shop.dim_users ......................... [CREATE TABLE (10.0k rows, 0 processed) in 5.21s]
1823:06:14  6 of 6 START sql incremental model pizza_shop.fct_orders ....................... [RUN]
1923:06:25  6 of 6 OK created sql incremental model pizza_shop.fct_orders .................. [INSERT (20.0k rows, 1.6 MiB processed) in 11.23s]
2023:06:25  
2123:06:25  Finished running 3 view models, 2 table models, 1 incremental model in 0 hours 0 minutes and 19.92 seconds (19.92s).
2223:06:25  
2323:06:25  Completed successfully
2423:06:25  
2523:06:25  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

Also, the project can be tested using the dbt test command.

 1$ dbt test
 223:06:53  Running with dbt=1.7.7
 323:06:54  Registered adapter: bigquery=1.7.4
 423:06:54  Found 6 models, 4 tests, 3 sources, 0 exposures, 0 metrics, 568 macros, 0 groups, 0 semantic models
 523:06:54  
 623:06:55  Concurrency: 4 threads (target='dev')
 723:06:55  
 823:06:55  1 of 4 START test not_null_dim_products_product_key ............................ [RUN]
 923:06:55  2 of 4 START test not_null_dim_users_user_key .................................. [RUN]
1023:06:55  3 of 4 START test unique_dim_products_product_key .............................. [RUN]
1123:06:55  4 of 4 START test unique_dim_users_user_key .................................... [RUN]
1223:06:57  2 of 4 PASS not_null_dim_users_user_key ........................................ [PASS in 2.27s]
1323:06:57  1 of 4 PASS not_null_dim_products_product_key .................................. [PASS in 2.40s]
1423:06:57  3 of 4 PASS unique_dim_products_product_key .................................... [PASS in 2.42s]
1523:06:57  4 of 4 PASS unique_dim_users_user_key .......................................... [PASS in 2.51s]
1623:06:57  
1723:06:57  Finished running 4 tests in 0 hours 0 minutes and 3.23 seconds (3.23s).
1823:06:57  
1923:06:57  Completed successfully
2023:06:57  
2123:06:57  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4

Fact Table Structure

The schema of the fact table can be found below. Both the product and user fields are marked as the RECORD type as they are structs (containers of fields). Also, the mode of the product is indicated as REPEATED, which means it is an array.

In the query result view, non-array fields are not repeated, and a row is split to fill each of the array items.

We can use the UNNEST operator if we need to convert the elements of an array into rows as shown below.

Update Records

Although we will discuss ETL orchestration with Apache Airflow in the next post, here I illustrate how the dimension and fact tables change when records are updated.

Product

First, a new record is inserted into the staging_products table, and the price is set to increase by 10.

 1-- // update a product record
 2INSERT INTO pizza_shop.staging_products (id, name, description, price, category, image, created_at)
 3    SELECT 1, name, description, price + 10, category, image, CURRENT_DATETIME('Australia/Sydney')
 4    FROM pizza_shop.staging_products
 5    WHERE id = 1;
 6
 7SELECT id, name, price, category, created_at 
 8FROM pizza_shop.staging_products
 9WHERE id = 1
10ORDER BY created_at;
11
12i  dname                             price  category    created_at
131  Moroccan Spice Pasta Pizza - Veg  335.0  veg pizzas  2024-02-04T09:50:05
141  Moroccan Spice Pasta Pizza - Veg  345.0  veg pizzas  2024-02-04T10:15:30.227352

When we execute the dbt run command again, we see the corresponding dimension table reflects the change by adding a new record and updating valid_from and valid_to columns accordingly. With this change, any later order record that has this product should be mapped into the new product record.

1SELECT product_key, price, created_at, valid_from, valid_to 
2FROM pizza_shop.dim_products 
3WHERE product_id = 1
4ORDER BY created_at;
5
6product_key                       price  created_at                  valid_from                  valid_to
7a8c5f8c082bcf52a164f2eccf2b493f6  335.0  2024-02-04T09:50:05         2024-02-04T09:50:05         2024-02-04T10:15:30.227352
8c995d7e1ec035da116c0f37e6284d1d5  345.0  2024-02-04T10:15:30.227352  2024-02-04T10:15:30.227352  2199-12-31T00:00:00

User

Also, a new record is inserted into the staging_users table while modifying the email address.

 1-- // update a user record
 2INSERT INTO pizza_shop.staging_users (id, first_name, last_name, email, residence, lat, lon, created_at)
 3    SELECT 1, first_name, last_name, 'john.doe@example.com', residence, lat, lon, CURRENT_DATETIME('Australia/Sydney')
 4    FROM pizza_shop.staging_users
 5    WHERE id = 1;
 6
 7SELECT id, first_name, last_name, email, created_at 
 8FROM pizza_shop.staging_users
 9WHERE id = 1
10ORDER BY created_at;
11
12id  first_name  last_name  email                       created_at
131   Kismat      Shroff     drishyamallick@hotmail.com  2024-02-04T09:50:07
141   Kismat      Shroff     john.doe@example.com        2024-02-04T10:17:48.340304

Again the corresponding dimension table reflects the change by adding a new record and updating valid_from and valid_to columns accordingly.

1SELECT user_key, email, valid_from, valid_to 
2FROM pizza_shop.dim_users 
3WHERE user_id = 1
4ORDER BY created_at;
5
6user_key                          email                       valid_from                  valid_to
78adf084f4ea4b01d4863da22c61873de  drishyamallick@hotmail.com  2024-02-04T09:50:07         2024-02-04T10:17:48.340304
865e019d6a1fa0ca03e520b49928ee95b  john.doe@example.com        2024-02-04T10:17:48.340304  2199-12-31T00:00:00

Order

We insert a new order record that has two items where the IDs of the first and second products are 1 and 2 respectively. In this example, we expect the first product maps to the updated product record while the record of the second product remains the same. We can check it by querying the new order record together with an existing record that has corresponding products. As expected, the query result shows the product record is updated only in the new order record.

 1-- // add an order record
 2INSERT INTO pizza_shop.staging_orders(id, user_id, items, created_at)
 3VALUES (
 4  20001, 
 5  1,
 6  JSON_ARRAY(JSON '{"product_id": 1, "quantity": 2}', JSON '{"product_id": 2, "quantity": 3}'), 
 7  CURRENT_DATETIME('Australia/Sydney')
 8);
 9
10SELECT o.order_id, p.key, p.id, p.price, p.quantity, o.created_at
11FROM `pizza_shop.fct_orders` AS o,
12  UNNEST(o.product) AS p
13WHERE o.order_id in (11146, 20001) AND p.id IN (1, 2)
14ORDER BY order_id;
15
16order_id  key                                id  price  quantity  created_at
1711146     * a8c5f8c082bcf52a164f2eccf2b493f6  1  335.0         1  2024-02-04T09:50:10
1811146     8dd51b3981692c787baa9d4335f15345    2   60.0         2  2024-02-04T09:50:10
1920001     * c995d7e1ec035da116c0f37e6284d1d5  1  345.0         2  2024-02-04T10:30:19.667473
2020001     8dd51b3981692c787baa9d4335f15345    2   60.0         3  2024-02-04T10:30:19.667473

Summary

In this series, we discuss practical examples of data warehouse and lakehouse development where data transformation is performed by the data build tool (dbt) and ETL is managed by Apache Airflow. In this post, we developed a dbt project on Google BigQuery using fictional pizza shop data. Two SCD type 2 dimension tables and a single transaction tables were modelled on a dbt project. The transaction table was denormalized using nested and repeated fields, which potentially can improve query performance by pre-joining corresponding dimension records. Finally, impacts of record updates were discussed in detail.