The data build tool (dbt) is a popular data transformation tool for data warehouse development. Moreover, it can be used for data lakehouse development thanks to open table formats such as Apache Iceberg, Apache Hudi and Delta Lake. dbt supports key AWS analytics services and I wrote a series of posts that discuss how to utilise dbt with Redshift, Glue, EMR on EC2, EMR on EKS, and Athena. Those posts focus on platform integration, however, they do not show realistic ETL scenarios.

In this series of posts, we discuss practical data warehouse/lakehouse examples including ETL orchestration with Apache Airflow. As a starting point, we develop a dbt project on PostgreSQL using fictional pizza shop data in this post.

Setup Database

PostgreSQL is used in the post, and it is deployed locally using Docker Compose. The source can be found in the GitHub repository of this post.

Prepare Data

Fictional pizza shop data from Building Real-Time Analytics Systems is used in this post. There are three data sets - products, users and orders. The first two data sets are copied from the book’s GitHub repository and saved into initdb/data folder. The last order data set is generated by the following Python script.

 1# initdb/generate_orders.py
 2import os
 3import csv
 4import dataclasses
 5import random
 6import json
 7
 8
 9@dataclasses.dataclass
10class Order:
11    user_id: int
12    items: str
13
14    @staticmethod
15    def create():
16        order_items = [
17            {"product_id": id, "quantity": random.randint(1, 5)}
18            for id in set(random.choices(range(1, 82), k=random.randint(1, 10)))
19        ]
20        return Order(
21            user_id=random.randint(1, 10000),
22            items=json.dumps([item for item in order_items]),
23        )
24
25
26if __name__ == "__main__":
27    """
28    Generate random orders given by the NUM_ORDERS environment variable.
29        - orders.csv will be written to ./data folder
30    
31    Example:
32        python generate_orders.py
33        NUM_ORDERS=10000 python generate_orders.py
34    """
35    NUM_ORDERS = int(os.getenv("NUM_ORDERS", "20000"))
36    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
37    orders = [Order.create() for _ in range(NUM_ORDERS)]
38
39    filepath = os.path.join(CURRENT_DIR, "data", "orders.csv")
40    if os.path.exists(filepath):
41        os.remove(filepath)
42
43    with open(os.path.join(CURRENT_DIR, "data", "orders.csv"), "w") as f:
44        writer = csv.writer(f)
45        writer.writerow(["user_id", "items"])
46        for order in orders:
47            writer.writerow(dataclasses.asdict(order).values())

Below shows sample order records generated by the script. It includes user ID and order items. As discussed further later, the items will be kept as the JSONB type in the staging table and transposed into rows in the fact table.

1user_id,items
26845,"[{""product_id"": 52, ""quantity"": 4}, {""product_id"": 68, ""quantity"": 5}]"
36164,"[{""product_id"": 77, ""quantity"": 4}]"
49303,"[{""product_id"": 5, ""quantity"": 2}, {""product_id"": 71, ""quantity"": 3}, {""product_id"": 74, ""quantity"": 2}, {""product_id"": 10, ""quantity"": 5}, {""product_id"": 12, ""quantity"": 2}]"

The folder structure of source data sets and order generation script can be found below.

1$ tree initdb/ -P  "*.csv|generate*" -I "scripts"
2initdb/
3├── data
4│   ├── orders.csv
5│   ├── products.csv
6│   └── users.csv
7└── generate_orders.py

Run Database

A PostgreSQL server is deployed using Docker Compose. Also, the compose file creates a database (devdb) and user (devuser) by specifying corresponding environment variables (POSTGRES_*). Moreover, the database bootstrap script (bootstrap.sql) is volume-mapped into the docker entry point directory, and it creates necessary schemas and tables followed by loading initial records into the staging tables at startup. See below for details about the bootstrap script.

 1# compose-postgres.yml
 2version: "3"
 3
 4services:
 5  postgres:
 6    image: postgres:13
 7    container_name: postgres
 8    ports:
 9      - 5432:5432
10    volumes:
11      - ./initdb/scripts:/docker-entrypoint-initdb.d
12      - ./initdb/data:/tmp
13      - postgres_data:/var/lib/postgresql/data
14    environment:
15      - POSTGRES_DB=devdb
16      - POSTGRES_USER=devuser
17      - POSTGRES_PASSWORD=password
18      - TZ=Australia/Sydney
19
20volumes:
21  postgres_data:
22    driver: local
23    name: postgres_data

Database Bootstrap Script

The bootstrap script begins with creating schemas and granting permission to the development user. Then the tables for the pizza shop data are created in the staging schema, and initial records are copied from data files into corresponding tables.

 1-- initdb/scripts/bootstrap.sql
 2
 3-- // create schemas and grant permission
 4CREATE SCHEMA staging;
 5GRANT ALL ON SCHEMA staging TO devuser;
 6
 7CREATE SCHEMA dev;
 8GRANT ALL ON SCHEMA dev TO devuser;
 9
10-- // create tables
11DROP TABLE IF EXISTS staging.users;
12DROP TABLE IF EXISTS staging.products;
13DROP TABLE IF EXISTS staging.orders;
14
15CREATE TABLE staging.users
16(
17    id SERIAL,
18    first_name VARCHAR(255),
19    last_name VARCHAR(255),
20    email VARCHAR(255),
21    residence VARCHAR(500),
22    lat DECIMAL(10, 8),
23    lon DECIMAL(10, 8),
24    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
25);
26
27CREATE TABLE IF NOT EXISTS staging.products
28(
29    id SERIAL,
30    name VARCHAR(100),
31    description VARCHAR(500),
32    price FLOAT,
33    category VARCHAR(100),
34    image VARCHAR(200),
35    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
36);
37
38CREATE TABLE IF NOT EXISTS staging.orders
39(
40    id SERIAL,
41    user_id INT,
42    items JSONB,
43    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
44);
45
46-- // copy data
47COPY staging.users(first_name, last_name, email, residence, lat, lon)
48FROM '/tmp/users.csv' DELIMITER ',' CSV HEADER;
49
50COPY staging.products(name, description, price, category, image)
51FROM '/tmp/products.csv' DELIMITER ',' CSV HEADER;
52
53COPY staging.orders(user_id, items)
54FROM '/tmp/orders.csv' DELIMITER ',' CSV HEADER;

Setup DBT Project

A dbt project named pizza_shop is created using the dbt-postgres package (dbt-postgres==1.7.4). Specifically, it is created using the dbt init command, and it bootstraps the project in the pizza_shop folder as well as adds the project profile to the dbt profiles file as shown below.

 1# $HOME/.dbt/profiles.yml
 2pizza_shop:
 3  outputs:
 4    dev:
 5      dbname: devdb
 6      host: localhost
 7      pass: password
 8      port: 5432
 9      schema: dev
10      threads: 4
11      type: postgres
12      user: devuser
13  target: dev

Project Sources

Recall that three tables are created in the staging schema by the database bootstrap script. They are used as sources of the project and their details are kept in sources.yml to be referred easily in other models.

 1# pizza_shop/models/sources.yml
 2version: 2
 3
 4sources:
 5  - name: raw
 6    schema: staging
 7    tables:
 8      - name: users
 9        identifier: users
10      - name: products
11        identifier: products
12      - name: orders
13        identifier: orders

Using the raw sources, three models are created by performing simple transformations such as adding surrogate keys using the dbt_utils package and changing column names. Note that, as the products and users dimension tables are kept by Type 2 slowly changing dimension (SCD type 2), the surrogate keys are used to uniquely identify relevant dimension records.

 1-- pizza_shop/models/src/src_products.sql
 2WITH raw_products AS (
 3  SELECT * FROM {{ source('raw', 'products') }}
 4)
 5SELECT
 6  {{ dbt_utils.generate_surrogate_key(['name', 'description', 'price', 'category', 'image']) }} as product_key,
 7  id AS product_id,
 8  name,
 9  description,
10  price,
11  category,
12  image,
13  created_at
14FROM raw_products
 1-- pizza_shop/models/src/src_users.sql
 2WITH raw_users AS (
 3  SELECT * FROM {{ source('raw', 'users') }}
 4)
 5SELECT
 6  {{ dbt_utils.generate_surrogate_key(['first_name', 'last_name', 'email', 'residence', 'lat', 'lon']) }} as user_key,
 7  id AS user_id,
 8  first_name,
 9  last_name,
10  email,
11  residence,
12  lat AS latitude,
13  lon AS longitude,
14  created_at
15FROM raw_users
 1-- pizza_shop/models/src/src_orders.sql
 2WITH raw_orders AS (
 3  SELECT * FROM {{ source('raw', 'orders') }}
 4)
 5SELECT
 6  id AS order_id,
 7  user_id,
 8  items,
 9  created_at
10FROM raw_orders

Data Modelling

For SCD type 2, the dimension tables are materialized as table and two additional columns are included - valid_from and valid_to. The extra columns are for setting up a time range where a record is applicable, and they are used to map a relevant surrogate key in the fact table when there are multiple dimension records according to the same natural key. Note that SCD type 2 tables can also be maintained by dbt snapshots.

 1-- pizza_shop/models/dim/dim_products.sql
 2{{
 3  config(
 4    materialized = 'table',
 5    )
 6}}
 7WITH src_products AS (
 8  SELECT * FROM {{ ref('src_products') }}
 9)
10SELECT
11    *, 
12    created_at AS valid_from,
13    COALESCE(
14      LEAD(created_at, 1) OVER (PARTITION BY product_id ORDER BY created_at), 
15      '2199-12-31'::TIMESTAMP
16    ) AS valid_to
17FROM src_products
 1-- pizza_shop/models/dim/dim_users.sql
 2{{
 3  config(
 4    materialized = 'table',
 5    )
 6}}
 7WITH src_users AS (
 8  SELECT * FROM {{ ref('src_users') }}
 9)
10SELECT
11    *, 
12    created_at AS valid_from,
13    COALESCE(
14      LEAD(created_at, 1) OVER (PARTITION BY user_id ORDER BY created_at), 
15      '2199-12-31'::TIMESTAMP
16    ) AS valid_to
17FROM src_users

The transactional fact table is materialized as incremental so that only new records are appended. Also, order items are transposed into rows using the jsonb_array_elements function. Finally, the records are joined with the dimension tables to add relevant surrogate keys from them. Note that the surrogate key of the fact table is constructed by a combination of all natural keys.

 1-- pizza_shop/models/fct/fct_orders.sql
 2{{
 3  config(
 4    materialized = 'incremental'
 5    )
 6}}
 7WITH dim_products AS (
 8  SELECT * FROM {{ ref('dim_products') }}
 9), dim_users AS (
10  SELECT * FROM {{ ref('dim_users') }}
11), src_orders AS (
12  SELECT 
13    order_id,
14    user_id,
15    jsonb_array_elements(items) AS order_item,
16    created_at    
17  FROM {{ ref('src_orders') }}
18), expanded_orders AS (
19  SELECT 
20    order_id,
21    user_id,
22    (order_item ->> 'product_id')::INT AS product_id,
23    (order_item ->> 'quantity')::INT AS quantity,    
24    created_at
25  FROM src_orders
26)
27SELECT
28  {{ dbt_utils.generate_surrogate_key(['order_id', 'p.product_id', 'u.user_id']) }} as order_key,
29  p.product_key,
30  u.user_key,
31  o.order_id,
32  o.user_id,
33  o.product_id,
34  o.quantity,
35  o.created_at
36FROM expanded_orders o
37JOIN dim_products p 
38  ON o.product_id = p.product_id
39    AND o.created_at >= p.valid_from
40    AND o.created_at < p.valid_to
41JOIN dim_users u 
42  ON o.user_id = u.user_id
43    AND o.created_at >= u.valid_from
44    AND o.created_at < u.valid_to
45{% if is_incremental() %}
46  WHERE o.created_at > (SELECT created_at from {{ this }} ORDER BY created_at DESC LIMIT 1)
47{% endif %}

We can keep the final models in a separate YAML file for testing and enhanced documentation.

 1# pizza_shop/models/schema.yml
 2version: 2
 3
 4models:
 5  - name: dim_products
 6    description: Products table, which is converted into SCD type 2
 7    columns:
 8      - name: product_key
 9        description: |
10          Primary key of the table
11          Surrogate key, which is generated by md5 hash using the following columns
12            - name, description, price, category, image          
13        tests:
14          - not_null
15          - unique
16      - name: product_id
17        description: Natural key of products
18      - name: name
19        description: Porduct name
20      - name: description
21        description: Product description
22      - name: price
23        description: Product price
24      - name: category
25        description: Product category
26      - name: image
27        description: Product image
28      - name: created_at
29        description: Timestamp when the record is loaded
30      - name: valid_from
31        description: Effective start timestamp of the corresponding record (inclusive)
32      - name: valid_to
33        description: Effective end timestamp of the corresponding record (exclusive)
34  - name: dim_users
35    description: Users table, which is converted into SCD type 2
36    columns:
37      - name: user_key
38        description: |
39          Primary key of the table
40          Surrogate key, which is generated by md5 hash using the following columns
41            - first_name, last_name, email, residence, lat, lon          
42        tests:
43          - not_null
44          - unique
45      - name: user_id
46        description: Natural key of users
47      - name: first_name
48        description: First name
49      - name: last_name
50        description: Last name
51      - name: email
52        description: Email address
53      - name: residence
54        description: User address
55      - name: latitude
56        description: Latitude of user address
57      - name: longitude
58        description: Longitude of user address
59      - name: created_at
60        description: Timestamp when the record is loaded
61      - name: valid_from
62        description: Effective start timestamp of the corresponding record (inclusive)
63      - name: valid_to
64        description: Effective end timestamp of the corresponding record (exclusive)
65  - name: fct_orders
66    description: Orders fact table. Order items are exploded into rows
67    columns:
68      - name: order_key
69        description: |
70          Primary key of the table
71          Surrogate key, which is generated by md5 hash using the following columns
72            - order_id, product_id, user_id          
73        tests:
74          - not_null
75          - unique
76      - name: product_key
77        description: Product surrogate key which matches the product dimension record
78        tests:
79          - not_null
80          - relationships:
81              to: ref('dim_products')
82              field: product_key
83      - name: user_key
84        description: User surrogate key which matches the user dimension record
85        tests:
86          - not_null
87          - relationships:
88              to: ref('dim_users')
89              field: user_key
90      - name: order_id
91        description: Natural key of orders
92      - name: user_id
93        description: Natural key of users
94      - name: product_id
95        description: Natural key of products
96      - name: quantity
97        description: Amount of products ordered
98      - name: created_at
99        description: Timestamp when the record is loaded

The project can be executed using the dbt run command as shown below.

 1$ dbt run
 204:39:32  Running with dbt=1.7.4
 304:39:33  Registered adapter: postgres=1.7.4
 404:39:33  Found 6 models, 10 tests, 3 sources, 0 exposures, 0 metrics, 515 macros, 0 groups, 0 semantic models
 504:39:33  
 604:39:33  Concurrency: 4 threads (target='dev')
 704:39:33  
 804:39:33  1 of 6 START sql view model dev.src_orders ..................................... [RUN]
 904:39:33  2 of 6 START sql view model dev.src_products ................................... [RUN]
1004:39:33  3 of 6 START sql view model dev.src_users ...................................... [RUN]
1104:39:33  2 of 6 OK created sql view model dev.src_products .............................. [CREATE VIEW in 0.14s]
1204:39:33  1 of 6 OK created sql view model dev.src_orders ................................ [CREATE VIEW in 0.15s]
1304:39:33  3 of 6 OK created sql view model dev.src_users ................................. [CREATE VIEW in 0.14s]
1404:39:33  4 of 6 START sql table model dev.dim_products .................................. [RUN]
1504:39:33  5 of 6 START sql table model dev.dim_users ..................................... [RUN]
1604:39:33  5 of 6 OK created sql table model dev.dim_users ................................ [SELECT 10000 in 0.12s]
1704:39:33  4 of 6 OK created sql table model dev.dim_products ............................. [SELECT 81 in 0.13s]
1804:39:33  6 of 6 START sql incremental model dev.fct_orders .............................. [RUN]
1904:39:33  6 of 6 OK created sql incremental model dev.fct_orders ......................... [SELECT 105789 in 0.33s]
2004:39:33  
2104:39:33  Finished running 3 view models, 2 table models, 1 incremental model in 0 hours 0 minutes and 0.70 seconds (0.70s).
2204:39:33  
2304:39:33  Completed successfully
2404:39:33  
2504:39:33  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

Also, the project can be tested using the dbt test command.

 1$ dbt test
 204:40:53  Running with dbt=1.7.4
 304:40:53  Registered adapter: postgres=1.7.4
 404:40:53  Found 6 models, 10 tests, 3 sources, 0 exposures, 0 metrics, 515 macros, 0 groups, 0 semantic models
 504:40:53  
 604:40:53  Concurrency: 4 threads (target='dev')
 704:40:53  
 804:40:53  1 of 10 START test not_null_dim_products_product_key ........................... [RUN]
 904:40:53  2 of 10 START test not_null_dim_users_user_key ................................. [RUN]
1004:40:53  3 of 10 START test not_null_fct_orders_order_key ............................... [RUN]
1104:40:53  4 of 10 START test not_null_fct_orders_product_key ............................. [RUN]
1204:40:53  1 of 10 PASS not_null_dim_products_product_key ................................. [PASS in 0.11s]
1304:40:53  5 of 10 START test not_null_fct_orders_user_key ................................ [RUN]
1404:40:53  2 of 10 PASS not_null_dim_users_user_key ....................................... [PASS in 0.11s]
1504:40:53  4 of 10 PASS not_null_fct_orders_product_key ................................... [PASS in 0.11s]
1604:40:53  3 of 10 PASS not_null_fct_orders_order_key ..................................... [PASS in 0.12s]
1704:40:53  6 of 10 START test relationships_fct_orders_product_key__product_key__ref_dim_products_  [RUN]
1804:40:53  7 of 10 START test relationships_fct_orders_user_key__user_key__ref_dim_users_ . [RUN]
1904:40:53  8 of 10 START test unique_dim_products_product_key ............................. [RUN]
2004:40:53  5 of 10 PASS not_null_fct_orders_user_key ...................................... [PASS in 0.09s]
2104:40:53  9 of 10 START test unique_dim_users_user_key ................................... [RUN]
2204:40:53  8 of 10 PASS unique_dim_products_product_key ................................... [PASS in 0.06s]
2304:40:53  10 of 10 START test unique_fct_orders_order_key ................................ [RUN]
2404:40:53  6 of 10 PASS relationships_fct_orders_product_key__product_key__ref_dim_products_  [PASS in 0.10s]
2504:40:53  7 of 10 PASS relationships_fct_orders_user_key__user_key__ref_dim_users_ ....... [PASS in 0.11s]
2604:40:53  9 of 10 PASS unique_dim_users_user_key ......................................... [PASS in 0.06s]
2704:40:53  10 of 10 PASS unique_fct_orders_order_key ...................................... [PASS in 0.11s]
2804:40:53  
2904:40:53  Finished running 10 tests in 0 hours 0 minutes and 0.42 seconds (0.42s).
3004:40:53  
3104:40:53  Completed successfully
3204:40:53  
3304:40:53  Done. PASS=10 WARN=0 ERROR=0 SKIP=0 TOTAL=10

Update Records

Although we will discuss ETL orchestration with Apache Airflow in the next post, here I illustrate how the dimension and fact tables change when records are updated.

Product

First, a new record is inserted into the products table in the staging schema, and the price is set to increase by 10.

 1-- // update a product record
 2INSERT INTO staging.products (id, name, description, price, category, image)
 3    SELECT 1, name, description, price + 10, category, image
 4    FROM staging.products
 5    WHERE id = 1;
 6
 7SELECT id, name, price, category, created_at 
 8FROM staging.products 
 9WHERE id = 1;
10
11id|name                            |price|category  |created_at         |
12--+--------------------------------+-----+----------+-------------------+
13 1|Moroccan Spice Pasta Pizza - Veg|335.0|veg pizzas|2024-01-14 15:38:30|
14 1|Moroccan Spice Pasta Pizza - Veg|345.0|veg pizzas|2024-01-14 15:43:15|

When we execute the dbt run command again, we see the corresponding dimension table reflects the change by adding a new record and updating valid_from and valid_to columns accordingly. With this change, any later order record that has this product should be mapped into the new product surrogate key.

1SELECT product_key, price, created_at, valid_from, valid_to 
2FROM dev.dim_products 
3WHERE product_id = 1;
4
5product_key                     |price|created_at         |valid_from         |valid_to           |
6--------------------------------+-----+-------------------+-------------------+-------------------+
7a8c5f8c082bcf52a164f2eccf2b493f6|335.0|2024-01-14 15:38:30|2024-01-14 15:38:30|2024-01-14 15:43:15|
8c995d7e1ec035da116c0f37e6284d1d5|345.0|2024-01-14 15:43:15|2024-01-14 15:43:15|2199-12-31 00:00:00|

User

Also, a new record is inserted into the users table in the staging schema while modifying the email address.

 1-- // update a user record
 2INSERT INTO staging.users (id, first_name, last_name, email, residence, lat, lon)
 3    SELECT 1, first_name, last_name, 'john.doe@example.com', residence, lat, lon
 4    FROM staging.users
 5    WHERE id = 1;
 6
 7SELECT id, first_name, last_name, email, created_at 
 8FROM staging.users 
 9WHERE id = 1;
10
11id|first_name|last_name|email                     |created_at         |
12--+----------+---------+--------------------------+-------------------+
13 1|Kismat    |Shroff   |drishyamallick@hotmail.com|2024-01-14 15:38:30|
14 1|Kismat    |Shroff   |john.doe@example.com      |2024-01-14 15:43:35|

Again the corresponding dimension table reflects the change by adding a new record and updating valid_from and valid_to columns accordingly.

1SELECT user_key, email, valid_from, valid_to 
2FROM dev.dim_users 
3WHERE user_id = 1;
4
5user_key                        |email                     |valid_from         |valid_to           |
6--------------------------------+--------------------------+-------------------+-------------------+
77f530277c15881c328b67c4764205a9c|drishyamallick@hotmail.com|2024-01-14 15:38:30|2024-01-14 15:43:35|
8f4b2344f893f50597cdc4a12c7e87e81|john.doe@example.com      |2024-01-14 15:43:35|2199-12-31 00:00:00|

Order

We insert a new order record that has two items where the IDs of the first and second products are 1 and 2 respectively. In this example, we expect the first product maps to the updated product surrogate key while the surrogate key of the second product remains the same. We can check it by querying the new order record together with an existing record that has corresponding products. As expected, the query result shows the product surrogate key is updated only in the new order record.

 1INSERT INTO staging.orders(user_id, items)
 2VALUES (1,'[{"product_id": 1, "quantity": 2}, {"product_id": 2, "quantity": 3}]');
 3
 4SELECT o.order_key, o.product_key, o.order_id, o.product_id, p.price, o.quantity, o.created_at 
 5FROM dev.fct_orders o
 6JOIN dev.dim_products p ON o.product_key = p.product_key
 7WHERE o.order_id IN (249, 20001)
 8ORDER BY o.order_id, o.product_id;
 9
10order_key                       |  product_key                     |order_id|product_id|price|quantity|created_at         |
11--------------------------------+----------------------------------+--------+----------+-----+--------+-------------------+
125c8f7a8bc27d825efdb2e8167c9ae481|* a8c5f8c082bcf52a164f2eccf2b493f6|     249|         1|335.0|       1|2024-01-14 15:38:30|
1342834bd2b8f847cbd182765b43094be0|  8dd51b3981692c787baa9d4335f15345|     249|         2| 60.0|       1|2024-01-14 15:38:30|
1428d937c571bd9601c2e719e618e56f67|* c995d7e1ec035da116c0f37e6284d1d5|   20001|         1|345.0|       2|2024-01-14 15:51:04|
1575e1957ce20f188ab7502c322e2ab7d9|  8dd51b3981692c787baa9d4335f15345|   20001|         2| 60.0|       3|2024-01-14 15:51:04|

Summary

The data build tool (dbt) is a popular data transformation tool. In this series, we discuss practical examples of data warehouse and lakehouse development where data transformation is performed by the data build tool (dbt) and ETL is managed by Apache Airflow. As a starting point, we developed a dbt project on PostgreSQL using fictional pizza shop data in this post. Two SCD type 2 dimension tables and a single transaction tables are modelled on a dbt project and impacts of record updates are discussed in detail.