Data Flow
This document explains how data moves through the DataSuite ETL pipeline, from source systems to analytics-ready dimensional models.
End-to-End Data Flow
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MySQL │ │ LogStash │ │ ClickHouse │ │ DBT │
│ (Adventure │────│ (Data Ingestion)│────│ (Data Warehouse│────│ (Transformations│
│ Works Source) │ │ │ │ Bronze Layer) │ │ Gold Layer) │
└─────────────────┘ └──────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │ │
│ Transactional │ Real-time │ Batch │ Scheduled
│ Updates │ Streaming │ Processing │ Execution
└─────────────────────────┼─────────────────────────┼─────────────────────────┘
│ │
┌─────────────────┐ ┌─────────────────┐
│ Data Quality │ │ Analytics │
│ Monitoring │ │ & Reporting │
└─────────────────┘ └─────────────────┘
Phase 1: Source Data Generation
Operational Activity
What Happens: Business transactions occur in the AdventureWorks MySQL database
Customer places orders through web/mobile applications
Inventory systems update product availability
Sales teams create customer records
Financial systems process payments
Data Characteristics:
Volume: 1,000-10,000 transactions per day
Velocity: Real-time as business events occur
Variety: Structured relational data across multiple schemas
Update Pattern: INSERT and UPDATE operations with timestamp tracking
Key Tables Updated:
-- Primary transaction tables
Sales.SalesOrderHeader -- New orders created
Sales.SalesOrderDetail -- Order line items added
Sales.Customer -- Customer profile updates
Production.Product -- Product catalog changes
Phase 2: Data Ingestion (LogStash)
Incremental Data Capture
What Happens: LogStash continuously monitors MySQL for changes
Process Flow:
Connection Establishment: JDBC connection to MySQL every 30 seconds
Change Detection: Query for records where
ModifiedDate > last_processed_timestamp
Data Extraction: Pull changed records using optimized SQL queries
Format Transformation: Convert MySQL result set to JSON format
Data Transmission: HTTP POST to ClickHouse bronze layer
Sample LogStash Query:
SELECT
salesorderid,
customerid,
territoryid,
orderdate,
subtotal,
taxamt,
totaldue,
modifieddate
FROM Sales.SalesOrderHeader
WHERE ModifiedDate > :sql_last_value
ORDER BY ModifiedDate
Performance Characteristics:
Polling Frequency: Every 30 seconds
Batch Size: 1,000 records per query
Throughput: ~1,000 records/second
Latency: Average 45 seconds from source to bronze layer
Data Quality at Ingestion
Validation Rules:
Primary key uniqueness checks
Data type validation and conversion
NULL value handling for required fields
Timestamp consistency verification
Phase 3: Bronze Layer Storage (ClickHouse)
Raw Data Persistence
What Happens: LogStash delivers JSON payloads to ClickHouse HTTP interface
Storage Strategy:
Table Structure: Mirror source schema with minimal modifications
Partitioning: Date-based partitioning for query performance
Ordering: Clustered by date and primary key for optimal compression
Retention: Configurable retention policies (default: 2 years)
Example Bronze Table:
CREATE TABLE bronze_layer.sales_orders (
salesorderid Int32,
customerid Int32,
territoryid Nullable(Int32),
orderdate DateTime,
subtotal Decimal64(4),
taxamt Decimal64(4),
totaldue Decimal64(4),
modifieddate DateTime,
-- Metadata for lineage tracking
_ingested_at DateTime DEFAULT now(),
_logstash_timestamp DateTime,
_source_file String DEFAULT ''
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(orderdate)
ORDER BY (orderdate, salesorderid);
Data Characteristics in Bronze:
Fidelity: Exact copy of source data with no business logic applied
Completeness: All source columns preserved, including system fields
Auditability: Complete history of all changes with ingestion timestamps
Reprocessability: Ability to rebuild gold layer from bronze at any time
Phase 4: Data Transformation (DBT)
Staging Layer Processing
What Happens: DBT creates cleaned, standardized views of bronze data
Staging Transformations:
-- models/staging/stg_sales_orders.sql
{{ config(materialized='view') }}
SELECT
salesorderid,
customerid,
territoryid,
orderdate,
duedate,
shipdate,
CASE
WHEN status = 1 THEN 'In Process'
WHEN status = 2 THEN 'Approved'
WHEN status = 3 THEN 'Backordered'
WHEN status = 4 THEN 'Rejected'
WHEN status = 5 THEN 'Shipped'
WHEN status = 6 THEN 'Cancelled'
ELSE 'Unknown'
END as status_description,
subtotal,
taxamt,
freight,
totaldue,
modifieddate
FROM {{ source('bronze_layer', 'sales_orders') }}
WHERE salesorderid IS NOT NULL
AND orderdate IS NOT NULL
Quality Checks:
Business rule validation
Data type standardization
Null handling and default values
Referential integrity verification
Dimensional Model Creation
What Happens: DBT builds star schema optimized for analytics
Fact Table Creation:
-- models/marts/core/fact_sale.sql
{{ config(
materialized='incremental',
unique_key='sale_key',
on_schema_change='fail'
) }}
WITH sales_with_keys AS (
SELECT
{{ dbt_utils.surrogate_key(['soh.salesorderid', 'sod.salesorderdetailid']) }} as sale_key,
{{ get_date_key('soh.orderdate') }} as date_key,
{{ get_territory_key('soh.territoryid') }} as territory_key,
{{ get_product_key('sod.productid') }} as product_key,
-- Measures
sod.orderqty,
sod.unitprice,
sod.linetotal,
soh.subtotal,
soh.taxamt,
soh.totaldue,
soh.salesorderid,
soh.orderdate
FROM {{ ref('stg_sales_order_header') }} soh
INNER JOIN {{ ref('stg_sales_order_detail') }} sod
ON soh.salesorderid = sod.salesorderid
)
SELECT * FROM sales_with_keys
{% if is_incremental() %}
WHERE orderdate > (SELECT MAX(orderdate) FROM {{ this }})
{% endif %}
Dimension Table Creation:
-- models/marts/core/dim_product.sql
{{ config(materialized='table') }}
SELECT
{{ dbt_utils.surrogate_key(['productid']) }} as product_key,
productid,
productname,
productnumber,
pc.name as product_category,
psc.name as product_subcategory,
standardcost,
listprice,
productline,
size,
weight,
color,
TRUE as is_current,
current_timestamp() as effective_from,
NULL as effective_to
FROM {{ ref('stg_products') }} p
LEFT JOIN {{ ref('stg_product_categories') }} pc
ON p.productcategoryid = pc.productcategoryid
LEFT JOIN {{ ref('stg_product_subcategories') }} psc
ON p.productsubcategoryid = psc.productsubcategoryid
Phase 5: Gold Layer Materialization
Star Schema Implementation
What Happens: DBT executes SQL to create final dimensional models in ClickHouse
Materialization Strategy:
Fact Tables: Incremental materialization for performance
Dimension Tables: Full refresh for SCD Type 1, incremental for SCD Type 2
Partitioning: Date-based partitioning aligned with query patterns
Indexing: Optimized for star schema join patterns
Final Schema Structure:
-- Fact table with measures and foreign keys
fact_sale (
sale_key, -- Surrogate primary key
date_key, -- Foreign key to dim_date
territory_key, -- Foreign key to dim_territory
product_key, -- Foreign key to dim_product
order_qty, -- Additive measure
unit_price, -- Semi-additive measure
line_total, -- Additive measure
sub_total, -- Additive measure
tax_amt, -- Additive measure
total_due -- Additive measure
)
-- Supporting dimension tables
dim_date (date_key, date, year, quarter, month, ...)
dim_territory (territory_key, territory_name, region, ...)
dim_product (product_key, product_name, category, ...)
Data Quality & Monitoring
Quality Gates Throughout Pipeline
Source Quality (MySQL):
Primary key constraints prevent duplicates
Foreign key relationships ensure referential integrity
NOT NULL constraints on required fields
Check constraints validate business rules
Ingestion Quality (LogStash):
Row count reconciliation between source and bronze
Data type validation during JSON conversion
Error logging and dead letter queuing for failed records
Throughput monitoring and alerting
Transformation Quality (DBT):
-- Example DBT test
version: 2
models:
- name: fact_sale
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- date_key
- territory_key
- product_key
- salesorderid
columns:
- name: sale_key
tests:
- unique
- not_null
- name: total_due
tests:
- dbt_utils.accepted_range:
min_value: 0
max_value: 1000000
End-to-End Monitoring
Freshness Monitoring:
-- Data freshness check
SELECT
'bronze_layer' as layer,
MAX(_ingested_at) as last_update,
CURRENT_TIMESTAMP() - MAX(_ingested_at) as lag_minutes
FROM bronze_layer.sales_orders
UNION ALL
SELECT
'gold_layer' as layer,
MAX(created_at) as last_update,
CURRENT_TIMESTAMP() - MAX(created_at) as lag_minutes
FROM gold_layer.fact_sale;
Volume Monitoring:
Row count tracking at each layer
Growth rate analysis and anomaly detection
Data distribution analysis for skew detection
Performance Characteristics
End-to-End Latency
Source to Bronze: 30-60 seconds (LogStash polling interval)
Bronze to Gold: 5-15 minutes (DBT execution time)
Total Latency: ~15-20 minutes for real-time updates
Throughput Capacity
LogStash Ingestion: 1,000 records/second per pipeline
ClickHouse Storage: 100,000+ inserts/second
DBT Processing: 1M+ records/minute for transformations
Query Performance: Sub-second response for most analytical queries
Scalability Patterns
Horizontal Scaling: Multiple LogStash instances for increased throughput
Vertical Scaling: Larger ClickHouse nodes for complex queries
Partitioning: Date-based partitioning for query pruning
Parallel Processing: DBT model parallelization for faster builds
Next Steps
Now that you understand the data flow, you can:
Set up your environment to see this flow in action
Configure LogStash for data ingestion
Build DBT models for transformations
The data flow provides the foundation for understanding how to troubleshoot issues and optimize performance in your own implementation.
Last updated
Was this helpful?