In this series, we develop real-time monitoring dashboard applications. A data generating app is created with Python, and it ingests the theLook eCommerce data continuously into a PostgreSQL database. A WebSocket server, built by FastAPI, periodically queries the data to serve its clients. The monitoring dashboards will be developed using Streamlit and Next.js, with Apache ECharts for visualization. In this post, we walk through the data generation app and backend API, while the monitoring dashboards will be discussed in later posts.

Docker Compose Services

We have three docker-compose services, and they are illustrated separately below. The source of this post can be found in this GitHub repository.

PostgreSQL

A PostgreSQL database server is configured with persistent storage, automatic initialization, and a health check. The health check is set up so that the remaining services wait until the database is ready.

 1# producer/docker-compose.yml
 2version: "3"
 3services:
 4  postgres:
 5    image: postgres:16
 6    container_name: postgres
 7    ports:
 8      - 5432:5432
 9    volumes:
10      - ./config/:/docker-entrypoint-initdb.d
11      - postgres_data:/var/lib/postgresql/data
12    environment:
13      POSTGRES_DB: develop
14      POSTGRES_USER: develop
15      POSTGRES_PASSWORD: password
16      PGUSER: develop
17      TZ: Australia/Sydney
18    healthcheck:
19      test: ["CMD-SHELL", "pg_isready -U develop"]
20      interval: 5s
21      timeout: 5s
22      retries: 5
23...
24volumes:
25  postgres_data:
26    driver: local
27    name: postgres_data

The bootstrap script creates a dedicated schema named ecommerce and sets the schema as the default search path.

1-- producer/config/postgres/bootstrap.sql
2CREATE SCHEMA ecommerce;
3GRANT ALL ON SCHEMA ecommerce TO develop;
4
5-- change search_path on a connection-level
6SET search_path TO ecommerce;
7
8-- change search_path on a database-level
9ALTER database "develop" SET search_path TO ecommerce;

Data Generator

The following Dockerfile is created for the data generation app and WebSocket server. It sets up a lightweight Python 3.10 environment for an application. It copies and installs dependencies from requirements.txt, then creates a dedicated user (app) with a home directory (/home/app) for security. The container runs as the app user instead of root, with /home/app set as the working directory.

 1# producer/Dockerfile
 2FROM python:3.10-slim
 3
 4## install dependent packages
 5COPY requirements.txt requirements.txt
 6
 7RUN pip install -r requirements.txt
 8
 9## create a user
10RUN useradd app && mkdir /home/app \
11    && chown app:app /home/app
12
13USER app
14WORKDIR /home/app

The data generation app builds from the local Dockerfile, runs as datagen, and connects to the PostgreSQL database using environment variables for credentials. The container executes generator.py with a 0.5-second delay between iterations and runs indefinitely (--max_iter -1). It mounts the current directory to /home/app for access to scripts and dependencies. The service starts only after the database is healthy, ensuring proper database availability.

 1# producer/docker-compose.yml
 2services:
 3...
 4  datagen:
 5    build:
 6      context: .
 7      dockerfile: Dockerfile
 8    container_name: datagen
 9    environment:
10      DB_USER: develop
11      DB_PASS: password
12      DB_HOST: postgres
13      DB_NAME: develop
14    command:
15      - python
16      - generator.py
17      - --wait_for
18      - "0.5"
19      - --max_iter
20      - "-1"
21    volumes:
22      - .:/home/app
23    depends_on:
24      postgres:
25        condition: service_healthy
26...

Data Generator Source

The theLook eCommerce dataset consists of seven entities, five of which are dynamically generated. In each iteration, a user record is created, associated with zero or more orders. Each order, in turn, generates zero or more order items. Finally, each order item produces zero or more event and inventory item records. Once all records are generated, they are ingested into the corresponding database tables using pandas’ to_sql method.

  1# producer/generator.py
  2import argparse
  3import time
  4import logging
  5
  6import pandas as pd
  7
  8from src.models import User
  9from src.utils import create_connection, insert_to_db, Connection, generate_from_csv
 10
 11extraneous_headers = [
 12    "event_type",
 13    "ip_address",
 14    "browser",
 15    "traffic_source",
 16    "session_id",
 17    "sequence_number",
 18    "uri",
 19    "is_sold",
 20]
 21
 22
 23def write_dynamic_data(
 24    conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
 25):
 26    tbl_map = {
 27        "users": [],
 28        "orders": [],
 29        "order_items": [],
 30        "inventory_items": [],
 31        "events": [],
 32    }
 33    user = User()
 34    logging.info(f"start to create user events - user id: {user.id}")
 35    tbl_map["users"].extend([user.asdict(["orders"])])
 36    orders = user.orders
 37    tbl_map["orders"].extend([o.asdict(["order_items"]) for o in orders])
 38    for order in orders:
 39        order_items = order.order_items
 40        tbl_map["order_items"].extend(
 41            [
 42                o.asdict(["events", "inventory_items"] + extraneous_headers)
 43                for o in order_items
 44            ]
 45        )
 46        for order_item in order_items:
 47            tbl_map["inventory_items"].extend(
 48                [i.asdict() for i in order_item.inventory_items]
 49            )
 50            tbl_map["events"].extend([e.asdict() for e in order_item.events])
 51
 52    for tbl in tbl_map:
 53        df = pd.DataFrame(tbl_map[tbl])
 54        if len(df) > 0:
 55            logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
 56            insert_to_db(
 57                df=df,
 58                tbl_name=tbl,
 59                schema_name=schema_name,
 60                conn=conn,
 61                if_exists=if_exists,
 62            )
 63        else:
 64            logging.info(
 65                f"skip records as no user event, table - {tbl}, # records - {len(df)}"
 66            )
 67
 68
 69def write_static_data(
 70    conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
 71):
 72    tbl_map = {
 73        "products": generate_from_csv("products.csv"),
 74        "dist_centers": generate_from_csv("distribution_centers.csv"),
 75    }
 76    for tbl in tbl_map:
 77        df = pd.DataFrame(tbl_map[tbl])
 78        if len(df) > 0:
 79            logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
 80            insert_to_db(
 81                df=df,
 82                tbl_name=tbl,
 83                schema_name=schema_name,
 84                conn=conn,
 85                if_exists=if_exists,
 86            )
 87        else:
 88            logging.info(f"skip writing, table - {tbl}, # records - {len(df)}")
 89
 90
 91def main(wait_for: float, max_iter: int, if_exists: str):
 92    conn = create_connection()
 93    write_static_data(conn=conn, if_exists="replace")
 94    curr_iter = 0
 95    while True:
 96        write_dynamic_data(conn=conn, if_exists=if_exists)
 97        time.sleep(wait_for)
 98        curr_iter += 1
 99        if max_iter > 0 and curr_iter >= max_iter:
100            logging.info(f"stop generating records after {curr_iter} iterations")
101            break
102
103
104if __name__ == "__main__":
105    logging.getLogger().setLevel(logging.INFO)
106    logging.info("Generate theLook eCommerce data...")
107
108    parser = argparse.ArgumentParser(description="Generate theLook eCommerce data")
109    parser.add_argument(
110        "--if_exists",
111        "-i",
112        type=str,
113        default="append",
114        choices=["fail", "replace", "append"],
115        help="The time to wait before generating new user records",
116    )
117    parser.add_argument(
118        "--wait_for",
119        "-w",
120        type=float,
121        default=1,
122        help="The time to wait before generating new user records",
123    )
124    parser.add_argument(
125        "--max_iter",
126        "-m",
127        type=int,
128        default=-1,
129        help="The maxium number of iterations to generate user records",
130    )
131    args = parser.parse_args()
132    logging.info(args)
133    main(args.wait_for, args.max_iter, if_exists=args.if_exists)

In the following example, we see data is generated in every two seconds (-w 2).

 1$ python data_gen.py -w 2
 2INFO:root:Generate theLook eCommerce data...
 3INFO:root:Namespace(if_exists='append', wait_for=2.0, max_iter=-1)
 4INFO:root:replace records, table - products, # records - 29120
 5INFO:root:replace records, table - dist_centers, # records - 10
 6INFO:root:start to create user events - user id: 2a444cd4-aa70-4247-b1c1-9cf9c8cc1924
 7INFO:root:append records, table - users, # records - 1
 8INFO:root:append records, table - orders, # records - 1
 9INFO:root:append records, table - order_items, # records - 2
10INFO:root:append records, table - inventory_items, # records - 5
11INFO:root:append records, table - events, # records - 14
12INFO:root:start to create user events - user id: 7d40f7f8-c022-4104-a1a0-9228da07fbe4
13INFO:root:append records, table - users, # records - 1
14INFO:root:skip records as no user event, table - orders, # records - 0
15INFO:root:skip records as no user event, table - order_items, # records - 0
16INFO:root:skip records as no user event, table - inventory_items, # records - 0
17INFO:root:skip records as no user event, table - events, # records - 0
18INFO:root:start to create user events - user id: 45f8469c-3e79-40ee-9639-1cb17cd98132
19INFO:root:append records, table - users, # records - 1
20INFO:root:skip records as no user event, table - orders, # records - 0
21INFO:root:skip records as no user event, table - order_items, # records - 0
22INFO:root:skip records as no user event, table - inventory_items, # records - 0
23INFO:root:skip records as no user event, table - events, # records - 0
24INFO:root:start to create user events - user id: 839e353f-07ee-4d77-b1de-2f1af9b12501
25INFO:root:append records, table - users, # records - 1
26INFO:root:append records, table - orders, # records - 2
27INFO:root:append records, table - order_items, # records - 3
28INFO:root:append records, table - inventory_items, # records - 9
29INFO:root:append records, table - events, # records - 19

When the data gets ingested into the database, we see the following tables are created in the ecommerce schema.

WebSocket Server

This WebSocket server runs a FastAPI-based API using uvicorn. It builds from the local Dockerfile, exposing port 8000, and connects to the PostgreSQL database with credentials and configuration variables. The service processes data with a 5-minute lookback window and refreshes every 5 seconds. The working directory is mounted for access to code, and the service starts only after PostgreSQL is healthy, ensuring database readiness.

 1# producer/docker-compose.yml
 2services:
 3...
 4  producer:
 5    build:
 6      context: .
 7      dockerfile: Dockerfile
 8    container_name: producer
 9    ports:
10      - "8000:8000"
11    environment:
12      DB_USER: develop
13      DB_PASS: password
14      DB_HOST: postgres
15      DB_NAME: develop
16      LOOKBACK_MINUTES: "5"
17      REFRESH_SECONDS: "5"
18    command:
19      - uvicorn
20      - api:app
21      - --host
22      - "0.0.0.0"
23      - --port
24      - "8000"
25    volumes:
26      - .:/home/app
27    depends_on:
28      postgres:
29        condition: service_healthy
30...

WebSocket Server Source

This FastAPI WebSocket server streams real-time data from a PostgreSQL database. It connects using SQLAlchemy, fetches order-related data with a configurable lookback window, and sends updates every few seconds as defined by refresh seconds. A WebSocket manager handles multiple connections, converting database results into JSON before streaming them. The app continuously queries the database, sending fresh data to connected clients until they disconnect. Logging ensures visibility into connections, queries, and errors.

  1# producer/api.py
  2import os
  3import logging
  4import asyncio
  5
  6from sqlalchemy import create_engine, Engine, Connection
  7import pandas as pd
  8from fastapi import FastAPI, WebSocket, WebSocketDisconnect
  9
 10logging.basicConfig(level=logging.INFO)
 11
 12try:
 13    LOOKBACK_MINUTES = int(os.getenv("LOOKBACK_MINUTES", "5"))
 14    REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "5"))
 15except ValueError:
 16    LOOKBACK_MINUTES = 5
 17    REFRESH_SECONDS = 5
 18
 19
 20def get_db_engine() -> Engine:
 21    """Creates and returns a SQLAlchemy engine."""
 22    user = os.getenv("DB_USER", "develop")
 23    password = os.getenv("DB_PASS", "password")
 24    host = os.getenv("DB_HOST", "localhost")
 25    db_name = os.getenv("DB_NAME", "develop")
 26
 27    try:
 28        return create_engine(
 29            f"postgresql+psycopg2://{user}:{password}@{host}/{db_name}", echo=True
 30        )
 31    except Exception as e:
 32        logging.error(f"Database connection error: {e}")
 33        raise
 34
 35
 36def fetch_data(conn: Connection, minutes: int = 0):
 37    """Fetches data from the database with an optional lookback filter."""
 38    sql = """
 39    SELECT
 40        u.id AS user_id
 41        , u.age
 42        , u.gender
 43        , u.country
 44        , u.traffic_source
 45        , o.order_id
 46        , o.id AS item_id
 47        , p.category
 48        , p.cost
 49        , o.status AS item_status
 50        , o.sale_price
 51        , o.created_at
 52    FROM users AS u
 53    JOIN order_items AS o ON u.id = o.user_id
 54    JOIN products AS p ON p.id = o.product_id
 55    """
 56    if minutes > 0:
 57        sql = f"{sql} WHERE o.created_at >= current_timestamp - interval '{minutes} minute'"
 58    else:
 59        sql = f"{sql} LIMIT 1"
 60    try:
 61        return pd.read_sql(sql=sql, con=conn)
 62    except Exception as e:
 63        logging.error(f"Error reading from database: {e}")
 64        return pd.DataFrame()
 65
 66
 67app = FastAPI()
 68
 69
 70class ConnectionManager:
 71    """Manages WebSocket connections."""
 72
 73    def __init__(self):
 74        self.active_connections: list[WebSocket] = []
 75
 76    async def connect(self, websocket: WebSocket):
 77        await websocket.accept()
 78        self.active_connections.append(websocket)
 79        logging.info(f"New WebSocket connection: {websocket.client}")
 80
 81    def disconnect(self, websocket: WebSocket):
 82        if websocket in self.active_connections:
 83            self.active_connections.remove(websocket)
 84            logging.info(f"WebSocket disconnected: {websocket.client}")
 85
 86    async def send_data(self, df: pd.DataFrame, websocket: WebSocket):
 87        """Converts DataFrame to JSON and sends it via WebSocket."""
 88        if not df.empty:
 89            await websocket.send_json(df.to_json(orient="records"))
 90
 91
 92manager = ConnectionManager()
 93
 94
 95@app.websocket("/ws")
 96async def websocket_endpoint(websocket: WebSocket):
 97    """Handles WebSocket connections and continuously streams data."""
 98    await manager.connect(websocket)
 99
100    engine = get_db_engine()
101
102    try:
103        with engine.connect() as conn:
104            while True:
105                df = fetch_data(conn, LOOKBACK_MINUTES)
106                logging.info(f"Fetched {df.shape[0]} records from database")
107                await manager.send_data(df, websocket)
108                await asyncio.sleep(REFRESH_SECONDS)
109    except WebSocketDisconnect:
110        manager.disconnect(websocket)
111    except Exception as e:
112        logging.error(f"WebSocket error: {e}")
113    finally:
114        engine.dispose()

Deploy Services

The Docker Compose services can be deployed using the command docker-compose -f producer/docker-compose.yml up -d. Once started, the server can be checked with a WebSocket client by executing ws listen ws://localhost:8000/ws, and its logs can be monitored by running docker logs -f producer.