In this series, we develop real-time monitoring dashboard applications. A data generating app is created with Python, and it ingests the theLook eCommerce data continuously into a PostgreSQL database. A WebSocket server, built by FastAPI, periodically queries the data to serve its clients. The monitoring dashboards will be developed using Streamlit and Next.js, with Apache ECharts for visualization. In this post, we walk through the data generation app and backend API, while the monitoring dashboards will be discussed in later posts.
Docker Compose Services
We have three docker-compose services, and they are illustrated separately below. The source of this post can be found in this GitHub repository.
PostgreSQL
A PostgreSQL database server is configured with persistent storage, automatic initialization, and a health check. The health check is set up so that the remaining services wait until the database is ready.
1# producer/docker-compose.yml
2version: "3"
3services:
4 postgres:
5 image: postgres:16
6 container_name: postgres
7 ports:
8 - 5432:5432
9 volumes:
10 - ./config/:/docker-entrypoint-initdb.d
11 - postgres_data:/var/lib/postgresql/data
12 environment:
13 POSTGRES_DB: develop
14 POSTGRES_USER: develop
15 POSTGRES_PASSWORD: password
16 PGUSER: develop
17 TZ: Australia/Sydney
18 healthcheck:
19 test: ["CMD-SHELL", "pg_isready -U develop"]
20 interval: 5s
21 timeout: 5s
22 retries: 5
23...
24volumes:
25 postgres_data:
26 driver: local
27 name: postgres_data
The bootstrap script creates a dedicated schema named ecommerce and sets the schema as the default search path.
1-- producer/config/postgres/bootstrap.sql
2CREATE SCHEMA ecommerce;
3GRANT ALL ON SCHEMA ecommerce TO develop;
4
5-- change search_path on a connection-level
6SET search_path TO ecommerce;
7
8-- change search_path on a database-level
9ALTER database "develop" SET search_path TO ecommerce;
Data Generator
The following Dockerfile is created for the data generation app and WebSocket server. It sets up a lightweight Python 3.10 environment for an application. It copies and installs dependencies from requirements.txt
, then creates a dedicated user (app
) with a home directory (/home/app
) for security. The container runs as the app
user instead of root, with /home/app
set as the working directory.
1# producer/Dockerfile
2FROM python:3.10-slim
3
4## install dependent packages
5COPY requirements.txt requirements.txt
6
7RUN pip install -r requirements.txt
8
9## create a user
10RUN useradd app && mkdir /home/app \
11 && chown app:app /home/app
12
13USER app
14WORKDIR /home/app
The data generation app builds from the local Dockerfile, runs as datagen
, and connects to the PostgreSQL database using environment variables for credentials. The container executes generator.py
with a 0.5-second delay between iterations and runs indefinitely (--max_iter -1
). It mounts the current directory to /home/app
for access to scripts and dependencies. The service starts only after the database is healthy, ensuring proper database availability.
1# producer/docker-compose.yml
2services:
3...
4 datagen:
5 build:
6 context: .
7 dockerfile: Dockerfile
8 container_name: datagen
9 environment:
10 DB_USER: develop
11 DB_PASS: password
12 DB_HOST: postgres
13 DB_NAME: develop
14 command:
15 - python
16 - generator.py
17 - --wait_for
18 - "0.5"
19 - --max_iter
20 - "-1"
21 volumes:
22 - .:/home/app
23 depends_on:
24 postgres:
25 condition: service_healthy
26...
Data Generator Source
The theLook eCommerce dataset consists of seven entities, five of which are dynamically generated. In each iteration, a user record is created, associated with zero or more orders. Each order, in turn, generates zero or more order items. Finally, each order item produces zero or more event and inventory item records. Once all records are generated, they are ingested into the corresponding database tables using pandas’ to_sql
method.
1# producer/generator.py
2import argparse
3import time
4import logging
5
6import pandas as pd
7
8from src.models import User
9from src.utils import create_connection, insert_to_db, Connection, generate_from_csv
10
11extraneous_headers = [
12 "event_type",
13 "ip_address",
14 "browser",
15 "traffic_source",
16 "session_id",
17 "sequence_number",
18 "uri",
19 "is_sold",
20]
21
22
23def write_dynamic_data(
24 conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
25):
26 tbl_map = {
27 "users": [],
28 "orders": [],
29 "order_items": [],
30 "inventory_items": [],
31 "events": [],
32 }
33 user = User()
34 logging.info(f"start to create user events - user id: {user.id}")
35 tbl_map["users"].extend([user.asdict(["orders"])])
36 orders = user.orders
37 tbl_map["orders"].extend([o.asdict(["order_items"]) for o in orders])
38 for order in orders:
39 order_items = order.order_items
40 tbl_map["order_items"].extend(
41 [
42 o.asdict(["events", "inventory_items"] + extraneous_headers)
43 for o in order_items
44 ]
45 )
46 for order_item in order_items:
47 tbl_map["inventory_items"].extend(
48 [i.asdict() for i in order_item.inventory_items]
49 )
50 tbl_map["events"].extend([e.asdict() for e in order_item.events])
51
52 for tbl in tbl_map:
53 df = pd.DataFrame(tbl_map[tbl])
54 if len(df) > 0:
55 logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
56 insert_to_db(
57 df=df,
58 tbl_name=tbl,
59 schema_name=schema_name,
60 conn=conn,
61 if_exists=if_exists,
62 )
63 else:
64 logging.info(
65 f"skip records as no user event, table - {tbl}, # records - {len(df)}"
66 )
67
68
69def write_static_data(
70 conn: Connection, schema_name: str = "ecommerce", if_exists: bool = "replace"
71):
72 tbl_map = {
73 "products": generate_from_csv("products.csv"),
74 "dist_centers": generate_from_csv("distribution_centers.csv"),
75 }
76 for tbl in tbl_map:
77 df = pd.DataFrame(tbl_map[tbl])
78 if len(df) > 0:
79 logging.info(f"{if_exists} records, table - {tbl}, # records - {len(df)}")
80 insert_to_db(
81 df=df,
82 tbl_name=tbl,
83 schema_name=schema_name,
84 conn=conn,
85 if_exists=if_exists,
86 )
87 else:
88 logging.info(f"skip writing, table - {tbl}, # records - {len(df)}")
89
90
91def main(wait_for: float, max_iter: int, if_exists: str):
92 conn = create_connection()
93 write_static_data(conn=conn, if_exists="replace")
94 curr_iter = 0
95 while True:
96 write_dynamic_data(conn=conn, if_exists=if_exists)
97 time.sleep(wait_for)
98 curr_iter += 1
99 if max_iter > 0 and curr_iter >= max_iter:
100 logging.info(f"stop generating records after {curr_iter} iterations")
101 break
102
103
104if __name__ == "__main__":
105 logging.getLogger().setLevel(logging.INFO)
106 logging.info("Generate theLook eCommerce data...")
107
108 parser = argparse.ArgumentParser(description="Generate theLook eCommerce data")
109 parser.add_argument(
110 "--if_exists",
111 "-i",
112 type=str,
113 default="append",
114 choices=["fail", "replace", "append"],
115 help="The time to wait before generating new user records",
116 )
117 parser.add_argument(
118 "--wait_for",
119 "-w",
120 type=float,
121 default=1,
122 help="The time to wait before generating new user records",
123 )
124 parser.add_argument(
125 "--max_iter",
126 "-m",
127 type=int,
128 default=-1,
129 help="The maxium number of iterations to generate user records",
130 )
131 args = parser.parse_args()
132 logging.info(args)
133 main(args.wait_for, args.max_iter, if_exists=args.if_exists)
In the following example, we see data is generated in every two seconds (-w 2
).
1$ python data_gen.py -w 2
2INFO:root:Generate theLook eCommerce data...
3INFO:root:Namespace(if_exists='append', wait_for=2.0, max_iter=-1)
4INFO:root:replace records, table - products, # records - 29120
5INFO:root:replace records, table - dist_centers, # records - 10
6INFO:root:start to create user events - user id: 2a444cd4-aa70-4247-b1c1-9cf9c8cc1924
7INFO:root:append records, table - users, # records - 1
8INFO:root:append records, table - orders, # records - 1
9INFO:root:append records, table - order_items, # records - 2
10INFO:root:append records, table - inventory_items, # records - 5
11INFO:root:append records, table - events, # records - 14
12INFO:root:start to create user events - user id: 7d40f7f8-c022-4104-a1a0-9228da07fbe4
13INFO:root:append records, table - users, # records - 1
14INFO:root:skip records as no user event, table - orders, # records - 0
15INFO:root:skip records as no user event, table - order_items, # records - 0
16INFO:root:skip records as no user event, table - inventory_items, # records - 0
17INFO:root:skip records as no user event, table - events, # records - 0
18INFO:root:start to create user events - user id: 45f8469c-3e79-40ee-9639-1cb17cd98132
19INFO:root:append records, table - users, # records - 1
20INFO:root:skip records as no user event, table - orders, # records - 0
21INFO:root:skip records as no user event, table - order_items, # records - 0
22INFO:root:skip records as no user event, table - inventory_items, # records - 0
23INFO:root:skip records as no user event, table - events, # records - 0
24INFO:root:start to create user events - user id: 839e353f-07ee-4d77-b1de-2f1af9b12501
25INFO:root:append records, table - users, # records - 1
26INFO:root:append records, table - orders, # records - 2
27INFO:root:append records, table - order_items, # records - 3
28INFO:root:append records, table - inventory_items, # records - 9
29INFO:root:append records, table - events, # records - 19
When the data gets ingested into the database, we see the following tables are created in the ecommerce schema.
WebSocket Server
This WebSocket server runs a FastAPI-based API using uvicorn
. It builds from the local Dockerfile, exposing port 8000, and connects to the PostgreSQL database with credentials and configuration variables. The service processes data with a 5-minute lookback window and refreshes every 5 seconds. The working directory is mounted for access to code, and the service starts only after PostgreSQL is healthy, ensuring database readiness.
1# producer/docker-compose.yml
2services:
3...
4 producer:
5 build:
6 context: .
7 dockerfile: Dockerfile
8 container_name: producer
9 ports:
10 - "8000:8000"
11 environment:
12 DB_USER: develop
13 DB_PASS: password
14 DB_HOST: postgres
15 DB_NAME: develop
16 LOOKBACK_MINUTES: "5"
17 REFRESH_SECONDS: "5"
18 command:
19 - uvicorn
20 - api:app
21 - --host
22 - "0.0.0.0"
23 - --port
24 - "8000"
25 volumes:
26 - .:/home/app
27 depends_on:
28 postgres:
29 condition: service_healthy
30...
WebSocket Server Source
This FastAPI WebSocket server streams real-time data from a PostgreSQL database. It connects using SQLAlchemy, fetches order-related data with a configurable lookback window, and sends updates every few seconds as defined by refresh seconds. A WebSocket manager handles multiple connections, converting database results into JSON before streaming them. The app continuously queries the database, sending fresh data to connected clients until they disconnect. Logging ensures visibility into connections, queries, and errors.
1# producer/api.py
2import os
3import logging
4import asyncio
5
6from sqlalchemy import create_engine, Engine, Connection
7import pandas as pd
8from fastapi import FastAPI, WebSocket, WebSocketDisconnect
9
10logging.basicConfig(level=logging.INFO)
11
12try:
13 LOOKBACK_MINUTES = int(os.getenv("LOOKBACK_MINUTES", "5"))
14 REFRESH_SECONDS = int(os.getenv("REFRESH_SECONDS", "5"))
15except ValueError:
16 LOOKBACK_MINUTES = 5
17 REFRESH_SECONDS = 5
18
19
20def get_db_engine() -> Engine:
21 """Creates and returns a SQLAlchemy engine."""
22 user = os.getenv("DB_USER", "develop")
23 password = os.getenv("DB_PASS", "password")
24 host = os.getenv("DB_HOST", "localhost")
25 db_name = os.getenv("DB_NAME", "develop")
26
27 try:
28 return create_engine(
29 f"postgresql+psycopg2://{user}:{password}@{host}/{db_name}", echo=True
30 )
31 except Exception as e:
32 logging.error(f"Database connection error: {e}")
33 raise
34
35
36def fetch_data(conn: Connection, minutes: int = 0):
37 """Fetches data from the database with an optional lookback filter."""
38 sql = """
39 SELECT
40 u.id AS user_id
41 , u.age
42 , u.gender
43 , u.country
44 , u.traffic_source
45 , o.order_id
46 , o.id AS item_id
47 , p.category
48 , p.cost
49 , o.status AS item_status
50 , o.sale_price
51 , o.created_at
52 FROM users AS u
53 JOIN order_items AS o ON u.id = o.user_id
54 JOIN products AS p ON p.id = o.product_id
55 """
56 if minutes > 0:
57 sql = f"{sql} WHERE o.created_at >= current_timestamp - interval '{minutes} minute'"
58 else:
59 sql = f"{sql} LIMIT 1"
60 try:
61 return pd.read_sql(sql=sql, con=conn)
62 except Exception as e:
63 logging.error(f"Error reading from database: {e}")
64 return pd.DataFrame()
65
66
67app = FastAPI()
68
69
70class ConnectionManager:
71 """Manages WebSocket connections."""
72
73 def __init__(self):
74 self.active_connections: list[WebSocket] = []
75
76 async def connect(self, websocket: WebSocket):
77 await websocket.accept()
78 self.active_connections.append(websocket)
79 logging.info(f"New WebSocket connection: {websocket.client}")
80
81 def disconnect(self, websocket: WebSocket):
82 if websocket in self.active_connections:
83 self.active_connections.remove(websocket)
84 logging.info(f"WebSocket disconnected: {websocket.client}")
85
86 async def send_data(self, df: pd.DataFrame, websocket: WebSocket):
87 """Converts DataFrame to JSON and sends it via WebSocket."""
88 if not df.empty:
89 await websocket.send_json(df.to_json(orient="records"))
90
91
92manager = ConnectionManager()
93
94
95@app.websocket("/ws")
96async def websocket_endpoint(websocket: WebSocket):
97 """Handles WebSocket connections and continuously streams data."""
98 await manager.connect(websocket)
99
100 engine = get_db_engine()
101
102 try:
103 with engine.connect() as conn:
104 while True:
105 df = fetch_data(conn, LOOKBACK_MINUTES)
106 logging.info(f"Fetched {df.shape[0]} records from database")
107 await manager.send_data(df, websocket)
108 await asyncio.sleep(REFRESH_SECONDS)
109 except WebSocketDisconnect:
110 manager.disconnect(websocket)
111 except Exception as e:
112 logging.error(f"WebSocket error: {e}")
113 finally:
114 engine.dispose()
Deploy Services
The Docker Compose services can be deployed using the command docker-compose -f producer/docker-compose.yml up -d
. Once started, the server can be checked with a WebSocket client by executing ws listen ws://localhost:8000/ws
, and its logs can be monitored by running docker logs -f producer
.
Comments