Data Flow

This document explains how data moves through the DataSuite ETL pipeline, from source systems to analytics-ready dimensional models.

End-to-End Data Flow

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   MySQL         │    │   LogStash       │    │   ClickHouse    │    │      DBT        │
│ (Adventure      │────│  (Data Ingestion)│────│  (Data Warehouse│────│ (Transformations│
│  Works Source)  │    │                  │    │   Bronze Layer) │    │   Gold Layer)   │
└─────────────────┘    └──────────────────┘    └─────────────────┘    └─────────────────┘
       │                         │                         │                         │
       │ Transactional           │ Real-time              │ Batch                   │ Scheduled
       │ Updates                 │ Streaming              │ Processing              │ Execution
       └─────────────────────────┼─────────────────────────┼─────────────────────────┘
                                 │                         │
                        ┌─────────────────┐       ┌─────────────────┐
                        │   Data Quality  │       │   Analytics     │
                        │   Monitoring    │       │   & Reporting   │
                        └─────────────────┘       └─────────────────┘

Phase 1: Source Data Generation

Operational Activity

What Happens: Business transactions occur in the AdventureWorks MySQL database

  • Customer places orders through web/mobile applications

  • Inventory systems update product availability

  • Sales teams create customer records

  • Financial systems process payments

Data Characteristics:

  • Volume: 1,000-10,000 transactions per day

  • Velocity: Real-time as business events occur

  • Variety: Structured relational data across multiple schemas

  • Update Pattern: INSERT and UPDATE operations with timestamp tracking

Key Tables Updated:

-- Primary transaction tables
Sales.SalesOrderHeader     -- New orders created
Sales.SalesOrderDetail     -- Order line items added
Sales.Customer             -- Customer profile updates
Production.Product         -- Product catalog changes

Phase 2: Data Ingestion (LogStash)

Incremental Data Capture

What Happens: LogStash continuously monitors MySQL for changes

Process Flow:

  1. Connection Establishment: JDBC connection to MySQL every 30 seconds

  2. Change Detection: Query for records where ModifiedDate > last_processed_timestamp

  3. Data Extraction: Pull changed records using optimized SQL queries

  4. Format Transformation: Convert MySQL result set to JSON format

  5. Data Transmission: HTTP POST to ClickHouse bronze layer

Sample LogStash Query:

SELECT 
    salesorderid,
    customerid, 
    territoryid,
    orderdate,
    subtotal,
    taxamt, 
    totaldue,
    modifieddate
FROM Sales.SalesOrderHeader 
WHERE ModifiedDate > :sql_last_value 
ORDER BY ModifiedDate

Performance Characteristics:

  • Polling Frequency: Every 30 seconds

  • Batch Size: 1,000 records per query

  • Throughput: ~1,000 records/second

  • Latency: Average 45 seconds from source to bronze layer

Data Quality at Ingestion

Validation Rules:

  • Primary key uniqueness checks

  • Data type validation and conversion

  • NULL value handling for required fields

  • Timestamp consistency verification

Phase 3: Bronze Layer Storage (ClickHouse)

Raw Data Persistence

What Happens: LogStash delivers JSON payloads to ClickHouse HTTP interface

Storage Strategy:

  • Table Structure: Mirror source schema with minimal modifications

  • Partitioning: Date-based partitioning for query performance

  • Ordering: Clustered by date and primary key for optimal compression

  • Retention: Configurable retention policies (default: 2 years)

Example Bronze Table:

CREATE TABLE bronze_layer.sales_orders (
    salesorderid Int32,
    customerid Int32,
    territoryid Nullable(Int32),
    orderdate DateTime,
    subtotal Decimal64(4),
    taxamt Decimal64(4), 
    totaldue Decimal64(4),
    modifieddate DateTime,
    
    -- Metadata for lineage tracking
    _ingested_at DateTime DEFAULT now(),
    _logstash_timestamp DateTime,
    _source_file String DEFAULT ''
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(orderdate)
ORDER BY (orderdate, salesorderid);

Data Characteristics in Bronze:

  • Fidelity: Exact copy of source data with no business logic applied

  • Completeness: All source columns preserved, including system fields

  • Auditability: Complete history of all changes with ingestion timestamps

  • Reprocessability: Ability to rebuild gold layer from bronze at any time

Phase 4: Data Transformation (DBT)

Staging Layer Processing

What Happens: DBT creates cleaned, standardized views of bronze data

Staging Transformations:

-- models/staging/stg_sales_orders.sql
{{ config(materialized='view') }}

SELECT
    salesorderid,
    customerid,
    territoryid,
    orderdate,
    duedate,
    shipdate,
    CASE 
        WHEN status = 1 THEN 'In Process'
        WHEN status = 2 THEN 'Approved' 
        WHEN status = 3 THEN 'Backordered'
        WHEN status = 4 THEN 'Rejected'
        WHEN status = 5 THEN 'Shipped'
        WHEN status = 6 THEN 'Cancelled'
        ELSE 'Unknown'
    END as status_description,
    subtotal,
    taxamt,
    freight,
    totaldue,
    modifieddate
FROM {{ source('bronze_layer', 'sales_orders') }}
WHERE salesorderid IS NOT NULL
  AND orderdate IS NOT NULL

Quality Checks:

  • Business rule validation

  • Data type standardization

  • Null handling and default values

  • Referential integrity verification

Dimensional Model Creation

What Happens: DBT builds star schema optimized for analytics

Fact Table Creation:

-- models/marts/core/fact_sale.sql
{{ config(
    materialized='incremental',
    unique_key='sale_key',
    on_schema_change='fail'
) }}

WITH sales_with_keys AS (
    SELECT 
        {{ dbt_utils.surrogate_key(['soh.salesorderid', 'sod.salesorderdetailid']) }} as sale_key,
        {{ get_date_key('soh.orderdate') }} as date_key,
        {{ get_territory_key('soh.territoryid') }} as territory_key,
        {{ get_product_key('sod.productid') }} as product_key,
        
        -- Measures
        sod.orderqty,
        sod.unitprice, 
        sod.linetotal,
        soh.subtotal,
        soh.taxamt,
        soh.totaldue,
        
        soh.salesorderid,
        soh.orderdate
    FROM {{ ref('stg_sales_order_header') }} soh
    INNER JOIN {{ ref('stg_sales_order_detail') }} sod
        ON soh.salesorderid = sod.salesorderid
)

SELECT * FROM sales_with_keys
{% if is_incremental() %}
    WHERE orderdate > (SELECT MAX(orderdate) FROM {{ this }})
{% endif %}

Dimension Table Creation:

-- models/marts/core/dim_product.sql
{{ config(materialized='table') }}

SELECT 
    {{ dbt_utils.surrogate_key(['productid']) }} as product_key,
    productid,
    productname,
    productnumber,
    pc.name as product_category,
    psc.name as product_subcategory,
    standardcost,
    listprice,
    productline,
    size,
    weight,
    color,
    TRUE as is_current,
    current_timestamp() as effective_from,
    NULL as effective_to
FROM {{ ref('stg_products') }} p
LEFT JOIN {{ ref('stg_product_categories') }} pc 
    ON p.productcategoryid = pc.productcategoryid
LEFT JOIN {{ ref('stg_product_subcategories') }} psc
    ON p.productsubcategoryid = psc.productsubcategoryid

Phase 5: Gold Layer Materialization

Star Schema Implementation

What Happens: DBT executes SQL to create final dimensional models in ClickHouse

Materialization Strategy:

  • Fact Tables: Incremental materialization for performance

  • Dimension Tables: Full refresh for SCD Type 1, incremental for SCD Type 2

  • Partitioning: Date-based partitioning aligned with query patterns

  • Indexing: Optimized for star schema join patterns

Final Schema Structure:

-- Fact table with measures and foreign keys
fact_sale (
    sale_key,           -- Surrogate primary key
    date_key,           -- Foreign key to dim_date
    territory_key,      -- Foreign key to dim_territory  
    product_key,        -- Foreign key to dim_product
    order_qty,          -- Additive measure
    unit_price,         -- Semi-additive measure
    line_total,         -- Additive measure
    sub_total,          -- Additive measure
    tax_amt,            -- Additive measure
    total_due           -- Additive measure
)

-- Supporting dimension tables
dim_date (date_key, date, year, quarter, month, ...)
dim_territory (territory_key, territory_name, region, ...)
dim_product (product_key, product_name, category, ...)

Data Quality & Monitoring

Quality Gates Throughout Pipeline

Source Quality (MySQL):

  • Primary key constraints prevent duplicates

  • Foreign key relationships ensure referential integrity

  • NOT NULL constraints on required fields

  • Check constraints validate business rules

Ingestion Quality (LogStash):

  • Row count reconciliation between source and bronze

  • Data type validation during JSON conversion

  • Error logging and dead letter queuing for failed records

  • Throughput monitoring and alerting

Transformation Quality (DBT):

-- Example DBT test
version: 2

models:
  - name: fact_sale
    tests:
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
            - date_key
            - territory_key
            - product_key
            - salesorderid
    columns:
      - name: sale_key
        tests:
          - unique
          - not_null
      - name: total_due
        tests:
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 1000000

End-to-End Monitoring

Freshness Monitoring:

-- Data freshness check
SELECT 
    'bronze_layer' as layer,
    MAX(_ingested_at) as last_update,
    CURRENT_TIMESTAMP() - MAX(_ingested_at) as lag_minutes
FROM bronze_layer.sales_orders

UNION ALL

SELECT 
    'gold_layer' as layer, 
    MAX(created_at) as last_update,
    CURRENT_TIMESTAMP() - MAX(created_at) as lag_minutes
FROM gold_layer.fact_sale;

Volume Monitoring:

  • Row count tracking at each layer

  • Growth rate analysis and anomaly detection

  • Data distribution analysis for skew detection

Performance Characteristics

End-to-End Latency

  • Source to Bronze: 30-60 seconds (LogStash polling interval)

  • Bronze to Gold: 5-15 minutes (DBT execution time)

  • Total Latency: ~15-20 minutes for real-time updates

Throughput Capacity

  • LogStash Ingestion: 1,000 records/second per pipeline

  • ClickHouse Storage: 100,000+ inserts/second

  • DBT Processing: 1M+ records/minute for transformations

  • Query Performance: Sub-second response for most analytical queries

Scalability Patterns

  • Horizontal Scaling: Multiple LogStash instances for increased throughput

  • Vertical Scaling: Larger ClickHouse nodes for complex queries

  • Partitioning: Date-based partitioning for query pruning

  • Parallel Processing: DBT model parallelization for faster builds

Next Steps

Now that you understand the data flow, you can:

  1. Set up your environment to see this flow in action

  2. Configure LogStash for data ingestion

  3. Build DBT models for transformations

The data flow provides the foundation for understanding how to troubleshoot issues and optimize performance in your own implementation.

Last updated

Was this helpful?