Documentations

  • User guide for developer to getting started with begin using Data Engineer flow on Data suite

Document outlines

  • The overrall architect of the sysmte

  • Getting started

    • Installation using docker

      • Explain the architecture of the local stack

      • Step by step installation using individual containers

      • Installtation using Docker-compose

  • Write ETL code using DBT

    • The overral data architeecture

    • Step by step write DBT code

    • Execute the code and output the data

Architect

Overview

@startuml
database DataSource
component LogStash
component Airflow {
    component EtlJob
}
database Clickhouse {
    component BronzeLayer
    component GoldLayer
}
DataSource <- LogStash: read and query data from data source
LogStash -> BronzeLayer: push data to clickhouse
EtlJob --> BronzeLayer: read raw data from clickhouse
EtlJob --> GoldLayer: persist data aftere ETL to clickhouse 


@enduml

Verification Steps

  1. Check MySQL: docker exec mysql mysql -uroot -ppassword -e "SHOW DATABASES;"

  2. Check ClickHouse: curl -u admin:clickhouse123 http://localhost:8123/ping

  3. Check LogStash: curl http://localhost:5044

  4. Check Airflow: Visit http://localhost:8080 (admin/admin)

Components

ID
Description

Data Source

External data sources that provide raw data for processing

LogStash

Data collection engine that reads and queries data from various sources

Airflow

Workflow orchestration platform containing ETL jobs

EtlJob

Data transformation jobs that process raw data

Clickhouse

Columnar database for analytical workloads

BronzeLayer

Raw data storage layer in ClickHouse

GoldLayer

Processed and refined data storage layer in ClickHouse

Getting started

Installation for local development using docker

Prerequisites

  • Docker and Docker Compose installed

  • Git for cloning repositories

  • 8GB+ RAM recommended for full stack

Option 1: Individual Container Setup

  1. Install MySQL database

# Create network for container communication
$ docker network create datasuite-network

# Run MySQL with persistent storage
$ docker run --name mysql \
  --network datasuite-network \
  -e MYSQL_ROOT_PASSWORD=password \
  -e MYSQL_DATABASE=adventureworks \
  -p 3306:3306 \
  -v mysql-data:/var/lib/mysql \
  -d mysql:8.0
  1. Seed Adventure Works data into MySQL

# Load Adventure Works 2019 data from local file
$ docker exec -i mysql mysql -uroot -ppassword adventureworks < docs/data/AdventureWorks2019.sql

# Verify data was loaded successfully
$ docker exec mysql mysql -uroot -ppassword adventureworks -e "SHOW TABLES;"
$ docker exec mysql mysql -uroot -ppassword adventureworks -e "SELECT COUNT(*) as table_count FROM information_schema.tables WHERE table_schema='adventureworks';"

Adventure Works Data Structure:

  • Sales schema: Customer orders, products, sales data

class Sales_SalesOrderDetail
class Sales_SaleOrderHeader
class Sales_Customer
class Sales_SalesTerritory

Sales_SaleOrderHeader --> Sales_SalesOrderDetail
Sales_SaleOrderHeader -> Sales_Customer
Sales_SaleOrderHeader -> Sales_SalesTerritory
  • Production schema: Product catalog, inventory

  • Person schema: Customer and employee information

  • Sample query: SELECT COUNT(*) FROM Sales.SalesOrderHeader;

  1. Install ClickHouse data warehouse

$ docker run --name clickhouse \
  --network datasuite-network \
  -p 8123:8123 \
  -p 9000:9000 \
  -e CLICKHOUSE_USER=admin \
  -e CLICKHOUSE_PASSWORD=clickhouse123 \
  -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \
  -v clickhouse-data:/var/lib/clickhouse \
  -d clickhouse/clickhouse-server:latest

# Verify ClickHouse is running with authentication  
$ curl -u admin:clickhouse123 http://localhost:8123/ping

# Test connection and create database
$ curl -u admin:clickhouse123 http://localhost:8123/ -d "CREATE DATABASE IF NOT EXISTS bronze_layer"
$ curl -u admin:clickhouse123 http://localhost:8123/ -d "CREATE DATABASE IF NOT EXISTS gold_layer"
  1. Install LogStash with MySQL connector

# Create directory for MySQL connector
$ mkdir -p logstash-drivers

# Download MySQL JDBC driver using Docker container
$ docker run --rm -v $(pwd)/logstash-drivers:/drivers alpine:latest \
  sh -c "apk add --no-cache wget && \
         wget -O /drivers/mysql-connector-java.jar \
         https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.9-rc/mysql-connector-java-8.0.9-rc.jar"

# Run LogStash with driver mounted
$ docker run --name logstash \
  --network datasuite-network \
  -p 5044:5044 \
  -v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v $(pwd)/logstash-drivers:/usr/share/logstash/drivers \
  -d docker.elastic.co/logstash/logstash:8.11.0

Sample LogStash Configuration (logstash.conf):

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
    jdbc_user => "root"
    jdbc_password => "password"
    statement => "SELECT * FROM Sales.SalesOrderHeader WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate"
    use_column_value => true
    tracking_column => "ModifiedDate"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
    clean_run => false
    schedule => "*/30 * * * * *"
  }
}

filter {
  # Convert to proper JSON format for ClickHouse
  mutate {
    remove_field => ["@version", "@timestamp"]
  }
}

output {
  http {
    url => "http://clickhouse:8123/"
    http_method => "post"
    format => "message"
    message => "INSERT INTO bronze_layer.sales_orders FORMAT JSONEachRow %{message}"
    headers => {
      "Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw=="  # admin:clickhouse123 base64
      "Content-Type" => "application/json"
    }
  }
  
  # Debug output to console
  stdout { 
    codec => rubydebug 
  }
}

Option 2: Docker Compose Setup

Create docker-compose.yml:

version: '3.8'
services:
  mysql:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: password
      MYSQL_DATABASE: adventureworks
    ports:
      - "3306:3306"
    volumes:
      - mysql-data:/var/lib/mysql

  clickhouse:
    image: clickhouse/clickhouse-server:latest
    ports:
      - "8123:8123"
      - "9000:9000"
    environment:
      CLICKHOUSE_USER: admin
      CLICKHOUSE_PASSWORD: clickhouse123
      CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
    volumes:
      - clickhouse-data:/var/lib/clickhouse

  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    ports:
      - "5044:5044"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
      - ./logstash-drivers:/usr/share/logstash/drivers
    depends_on:
      - mysql
      - clickhouse

  airflow:
    image: apache/airflow:2.7.0
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
    depends_on:
      - clickhouse

volumes:
  mysql-data:
  clickhouse-data:

Download MySQL JDBC driver and start all services:

# Create directory and download MySQL connector using Docker
$ mkdir -p logstash-drivers
$ docker run --rm -v $(pwd)/logstash-drivers:/drivers alpine:latest \
  sh -c "apk add --no-cache wget && \
         wget -O /drivers/mysql-connector-java.jar \
         https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.9-rc/mysql-connector-java-8.0.9-rc.jar"

# Start all services
$ docker-compose up -d
  1. Expose ClickHouse securely via tunneling

Method 1: SSH Tunnel

# Create SSH tunnel to remote server
$ ssh -L 8123:localhost:8123 user@remote-server

Method 2: ngrok (for development/testing)

# Install ngrok
$ npm install -g ngrok

# Create secure tunnel for ClickHouse HTTP interface
$ ngrok tcp 8123

# Access via provided ngrok URL (e.g., 0.tcp.ngrok.io:12345)

Method 3: ngrok with authentication

# Create tunnel with basic auth
$ ngrok http 8123 --auth="username:password"

Security Best Practices:

  • Use IP whitelisting in ngrok dashboard

  • Enable ClickHouse authentication: users.xml configuration

  • Use HTTPS/TLS connections for production

  • Rotate access credentials regularly

  • Monitor access logs for suspicious activity

Write DBT code to ETL code from data warehouse bronze layer to gold layer

Step 2: Dimensional Model for Sale Processes

Overview

Fact Table
Dim Date
Dim Territory
Dim Product

fact_sale

x

x

x

Original table ERD Structure

Table Explanations

Fact Table

Table Name
Purpose
Description
Key Metrics
Grain

fact_sale

Central fact table for sales transactions

Contains measurable sales events with foreign keys to dimension tables. Each row represents a sales order line item with associated quantities, amounts, and dimensional context.

• OrderQty (quantity sold) • UnitPrice (price per unit) • LineTotal (extended price) • SubTotal (order subtotal) • TaxAmt (tax amount) • TotalDue (final amount)

One row per sales order detail line item

Dimension Tables

Table Name
Purpose
Description
Key Attributes
Type

dim_date

Time dimension

Provides temporal context for sales analysis with pre-computed date attributes for easy filtering and grouping by various time periods.

• Date (primary date) • Year, Quarter, Month, Day • DayOfWeek, WeekOfYear • IsWeekend, IsHoliday • FiscalYear, FiscalQuarter

Type 1 SCD (Static)

dim_territory

Geographic dimension

Contains sales territory information for regional analysis and geographic reporting. Supports hierarchical geographic analysis from region to country.

• TerritoryID (business key) • TerritoryName • CountryRegionCode • TerritoryGroup • RegionName

Type 1 SCD

dim_product

Product dimension

Product catalog information enabling product-based analysis. Contains current and historical product attributes for trend analysis.

• ProductID (business key) • ProductName • ProductNumber • ProductCategory • ProductSubcategory • StandardCost, ListPrice • ProductLine, ProductModel

Type 2 SCD (Historical)

Star Schema Relationships

@startuml
!theme plain

entity fact_sale {
  * sale_key : bigint <<PK>>
  --
  * date_key : int <<FK>>
  * territory_key : int <<FK>>
  * product_key : int <<FK>>
  --
  order_qty : decimal
  unit_price : decimal
  line_total : decimal
  sub_total : decimal
  tax_amt : decimal
  total_due : decimal
  sales_order_id : int
}

entity dim_date {
  * date_key : int <<PK>>
  --
  date : date
  year : int
  quarter : int
  month : int
  day : int
  day_of_week : int
  week_of_year : int
  is_weekend : boolean
  fiscal_year : int
  fiscal_quarter : int
}

entity dim_territory {
  * territory_key : int <<PK>>
  --
  territory_id : int
  territory_name : varchar(50)
  country_region_code : varchar(3)
  territory_group : varchar(50)
  region_name : varchar(50)
}

entity dim_product {
  * product_key : int <<PK>>
  --
  product_id : int
  product_name : varchar(50)
  product_number : varchar(25)
  product_category : varchar(50)
  product_subcategory : varchar(50)
  standard_cost : decimal
  list_price : decimal
  is_current : boolean
}

dim_date ||--o{ fact_sale
dim_territory ||--o{ fact_sale
dim_product ||--o{ fact_sale

note right of fact_sale : Grain: One row per sales order line item\nVolume: ~500K rows/month
note top of dim_date : Type 1 SCD - Pre-built calendar
note left of dim_territory : Type 1 SCD - Geographic data
note bottom of dim_product : Type 2 SCD - Historical tracking

@enduml

Data Sources and Lineage

Dimension/Fact
Source Tables
Transformation Notes

fact_sale

Sales_SalesOrderHeader Sales_SalesOrderDetail

Joined on SalesOrderID, aggregated by line item

dim_date

Date functions/calendar table

Generated programmatically with business calendar rules

dim_territory

Sales_SalesTerritory

Direct mapping with cleansing and standardization

dim_product

Production_Product Production_ProductCategory Production_ProductSubcategory

Denormalized product hierarchy for performance

DBT Project Setup Steps

Step 1: Environment Setup

# Install DBT Core
pip install dbt-core

# Install ClickHouse adapter (for our data warehouse)
pip install dbt-clickhouse

# Verify installation
dbt --version

Step 2: Create New DBT Project

# Create new DBT project
dbt init adventureworks_analytics

# Navigate to project directory
cd adventureworks_analytics

Step 3: Configure Database Connection

Update profiles.yml file (usually located in ~/.dbt/profiles.yml):

adventureworks_analytics:
  target: dev
  outputs:
    dev:
      type: clickhouse
      host: localhost
      port: 8123
      user: admin
      password: clickhouse123
      database: analytics
      schema: gold_layer
      threads: 4
      keepalives_idle: 0
      search_path: bronze_layer,gold_layer
    prod:
      type: clickhouse
      host: "{{ env_var('CLICKHOUSE_HOST') }}"
      port: 8123
      user: "{{ env_var('CLICKHOUSE_USER') }}"
      password: "{{ env_var('CLICKHOUSE_PASSWORD') }}"
      database: analytics
      schema: gold_layer
      threads: 8

Step 4: Install Custom Data Suite Adapter (Optional)

# Add to packages.yml in your DBT project root
echo "packages:
  - git: https://gitlab.fci.vn/datasuite/data-suite-data-engineer/adapter_oracle.git
    revision: 1.0.8" > packages.yml

# Install packages
dbt deps

Step 5: Configure Project Structure

Update dbt_project.yml:

name: 'adventureworks_analytics'
version: '1.0.0'
config-version: 2

profile: 'adventureworks_analytics'

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

models:
  adventureworks_analytics:
    # Bronze layer (raw data staging)
    staging:
      +materialized: view
      +schema: bronze_layer
    
    # Gold layer (dimensional model)
    marts:
      +materialized: table
      +schema: gold_layer
      
    # Data quality tests
    tests:
      +severity: error

Step 6: Create Layer Structure

# Create model directories
mkdir -p models/staging
mkdir -p models/marts/core
mkdir -p models/marts/finance
mkdir -p macros
mkdir -p tests

# Create source definitions
touch models/staging/sources.yml
touch models/staging/staging.yml

Step 7: Define Data Sources

Create models/staging/sources.yml:

version: 2

sources:
  - name: bronze_layer
    description: "Raw data from LogStash ETL pipeline"
    tables:
      - name: sales_orders
        description: "Sales order header information"
        columns:
          - name: salesorderid
            description: "Primary key for sales order"
            tests:
              - unique
              - not_null
              
      - name: sales_order_details
        description: "Sales order line item details"
        
      - name: customers
        description: "Customer master data"
        
      - name: products
        description: "Product catalog information"

Step 8: Create Staging Models

Create staging models to clean and standardize bronze layer data:

-- models/staging/stg_sales_orders.sql
{{ config(materialized='view') }}

SELECT
    salesorderid,
    customerid,
    territoryid,
    orderdate,
    duedate,
    shipdate,
    status,
    subtotal,
    taxamt,
    freight,
    totaldue,
    modifieddate
FROM {{ source('bronze_layer', 'sales_orders') }}
WHERE salesorderid IS NOT NULL

Step 9: Build Dimensional Models

Create dimensional model files:

  • models/marts/core/dim_date.sql

  • models/marts/core/dim_territory.sql

  • models/marts/core/dim_product.sql

  • models/marts/core/fact_sale.sql

Step 10: Run DBT Pipeline

# Test connections
dbt debug

# Run staging models
dbt run --models staging

# Run dimensional models
dbt run --models marts

# Run all tests
dbt test

# Generate documentation
dbt docs generate
dbt docs serve

Step 11: Schedule with Airflow (Optional)

Create DAG for automated runs:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

dag = DAG(
    'adventureworks_dbt_pipeline',
    default_args={'retries': 1},
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    start_date=datetime(2025, 1, 1)
)

dbt_run = BashOperator(
    task_id='dbt_run',
    bash_command='cd /path/to/adventureworks_analytics && dbt run',
    dag=dag
)

dbt_test = BashOperator(
    task_id='dbt_test', 
    bash_command='cd /path/to/adventureworks_analytics && dbt test',
    dag=dag
)

dbt_run >> dbt_test

Step 12: Data Quality & Monitoring

  • Set up data quality tests in tests/ directory

  • Configure alerting for test failures

  • Monitor pipeline performance and data freshness

  • Set up incremental model strategies for large tables

Create and deploy code ETL to Airflow (production)

Overview

This section covers deploying the complete AdventureWorks ETL pipeline to Apache Airflow for production use, including LogStash data ingestion, DBT transformations, and monitoring.

Production Architecture

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   MySQL         │    │   LogStash       │    │   ClickHouse    │    │      DBT        │
│ (Adventure      │────│  (Data Ingestion)│────│  (Data Warehouse│────│ (Transformations│
│  Works Source)  │    │                  │    │   Bronze Layer) │    │   Gold Layer)   │
└─────────────────┘    └──────────────────┘    └─────────────────┘    └─────────────────┘
                                 │                         │                         │
                                 └─────────────────────────┼─────────────────────────┘

                                                  ┌─────────────────┐
                                                  │   Apache        │
                                                  │   Airflow       │
                                                  │ (Orchestration) │
                                                  └─────────────────┘

Prerequisites

  • Docker and Docker Compose installed

  • Apache Airflow 2.7+ running

  • ClickHouse database accessible

  • DBT project configured (from previous section)

  • LogStash configuration files ready

Step 1: Airflow Environment Setup

1.1 Docker Compose Configuration

Create docker-compose.yml for Airflow production setup:

version: '3.8'

x-airflow-common:
  &airflow-common
  image: apache/airflow:2.7.0
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    # Environment variables for our ETL pipeline
    MYSQL_HOST: mysql
    MYSQL_USER: root
    MYSQL_PASSWORD: password
    CLICKHOUSE_HOST: clickhouse
    CLICKHOUSE_USER: admin
    CLICKHOUSE_PASSWORD: clickhouse123
    DBT_PROJECT_DIR: /opt/airflow/dbt/adventureworks_analytics
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/dbt:/opt/airflow/dbt
    - ./docs/logstash.conf:/opt/airflow/config/logstash.conf
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version

volumes:
  postgres-db-volume:

1.2 Install Required Packages

Create requirements.txt for additional Python packages:

dbt-core==1.6.0
dbt-clickhouse==1.5.0
psycopg2-binary==2.9.7
clickhouse-driver==0.2.6
apache-airflow-providers-docker==3.7.2
apache-airflow-providers-ssh==3.7.0

Build custom Airflow image with requirements:

FROM apache/airflow:2.7.0
COPY requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt

Step 2: Create Production DAGs

2.1 Main ETL Pipeline DAG

Create dags/adventureworks_etl_pipeline.py:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago

# Default arguments for all tasks
default_args = {
    'owner': 'data-engineering-team',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'catchup': False
}

# Define the DAG
dag = DAG(
    'adventureworks_etl_pipeline',
    default_args=default_args,
    description='Complete AdventureWorks ETL Pipeline',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    max_active_runs=1,
    tags=['etl', 'adventureworks', 'production']
)

# Task 1: Health Check - Verify all services are running
def check_services_health():
    import requests
    import time
    
    services = {
        'ClickHouse': 'http://clickhouse:8123/ping',
        'MySQL': 'mysql:3306'  # Will be checked differently
    }
    
    for service, endpoint in services.items():
        if service == 'ClickHouse':
            try:
                response = requests.get(endpoint, timeout=10)
                if response.status_code != 200:
                    raise Exception(f"{service} health check failed")
                print(f"✅ {service} is healthy")
            except Exception as e:
                raise Exception(f"❌ {service} health check failed: {str(e)}")
        
    return "All services healthy"

health_check = PythonOperator(
    task_id='health_check',
    python_callable=check_services_health,
    dag=dag
)

# Task 2: Start LogStash Data Ingestion
start_logstash = DockerOperator(
    task_id='start_logstash_ingestion',
    image='docker.elastic.co/logstash/logstash:8.11.0',
    container_name='logstash-etl-{{ ds }}',
    api_version='auto',
    auto_remove=True,
    environment={
        'MYSQL_HOST': '{{ var.value.mysql_host }}',
        'MYSQL_PASSWORD': '{{ var.value.mysql_password }}',
        'CLICKHOUSE_HOST': '{{ var.value.clickhouse_host }}',
        'CLICKHOUSE_PASSWORD': '{{ var.value.clickhouse_password }}'
    },
    volumes=[
        '/opt/airflow/config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro',
        '/opt/airflow/drivers:/usr/share/logstash/drivers:ro',
        '/opt/airflow/sql-queries:/usr/share/logstash/sql-queries:ro'
    ],
    network_mode='bridge',
    dag=dag
)

# Task 3: Monitor LogStash Completion
def monitor_logstash_completion():
    import time
    import clickhouse_driver
    
    client = clickhouse_driver.Client(
        host='clickhouse',
        port=9000,
        user='admin',
        password='clickhouse123'
    )
    
    # Check if data was loaded in the last hour
    query = """
    SELECT COUNT(*) as record_count
    FROM bronze_layer.sales_orders 
    WHERE pipeline_timestamp >= NOW() - INTERVAL 1 HOUR
    """
    
    result = client.execute(query)
    record_count = result[0][0]
    
    if record_count == 0:
        raise Exception("No new records found in bronze layer")
    
    print(f"✅ LogStash loaded {record_count} records successfully")
    return record_count

monitor_logstash = PythonOperator(
    task_id='monitor_logstash_completion',
    python_callable=monitor_logstash_completion,
    dag=dag
)

# Task 4: Run DBT Staging Models
dbt_staging = BashOperator(
    task_id='dbt_run_staging',
    bash_command="""
    cd {{ var.value.dbt_project_dir }} &&
    dbt run --models staging --target prod
    """,
    dag=dag
)

# Task 5: Run DBT Tests on Staging
dbt_test_staging = BashOperator(
    task_id='dbt_test_staging',
    bash_command="""
    cd {{ var.value.dbt_project_dir }} &&
    dbt test --models staging --target prod
    """,
    dag=dag
)

# Task 6: Run DBT Dimensional Models
dbt_marts = BashOperator(
    task_id='dbt_run_marts',
    bash_command="""
    cd {{ var.value.dbt_project_dir }} &&
    dbt run --models marts --target prod
    """,
    dag=dag
)

# Task 7: Run DBT Tests on Marts
dbt_test_marts = BashOperator(
    task_id='dbt_test_marts',
    bash_command="""
    cd {{ var.value.dbt_project_dir }} &&
    dbt test --models marts --target prod
    """,
    dag=dag
)

# Task 8: Data Quality Validation
def validate_data_quality():
    import clickhouse_driver
    
    client = clickhouse_driver.Client(
        host='clickhouse',
        port=9000,
        user='admin',
        password='clickhouse123'
    )
    
    # Quality checks
    checks = [
        {
            'name': 'Fact table record count',
            'query': 'SELECT COUNT(*) FROM gold_layer.fact_sale',
            'min_expected': 1000
        },
        {
            'name': 'Date dimension completeness',
            'query': 'SELECT COUNT(*) FROM gold_layer.dim_date',
            'min_expected': 365
        },
        {
            'name': 'Product dimension completeness',
            'query': 'SELECT COUNT(*) FROM gold_layer.dim_product WHERE is_current = 1',
            'min_expected': 100
        }
    ]
    
    results = {}
    for check in checks:
        result = client.execute(check['query'])
        count = result[0][0]
        
        if count < check['min_expected']:
            raise Exception(f"Data quality check failed: {check['name']} - Expected: {check['min_expected']}, Got: {count}")
        
        results[check['name']] = count
        print(f"✅ {check['name']}: {count} records")
    
    return results

data_quality_check = PythonOperator(
    task_id='data_quality_validation',
    python_callable=validate_data_quality,
    dag=dag
)

# Task 9: Generate DBT Documentation
dbt_docs = BashOperator(
    task_id='generate_dbt_docs',
    bash_command="""
    cd {{ var.value.dbt_project_dir }} &&
    dbt docs generate --target prod &&
    echo "DBT documentation generated successfully"
    """,
    dag=dag
)

# Task 10: Pipeline Success Notification
def send_success_notification(**context):
    from airflow.providers.email.operators.email import EmailOperator
    
    execution_date = context['execution_date']
    task_instance = context['task_instance']
    
    # Get data quality results from previous task
    quality_results = task_instance.xcom_pull(task_ids='data_quality_validation')
    
    message = f"""
    AdventureWorks ETL Pipeline completed successfully!
    
    Execution Date: {execution_date}
    
    Data Quality Results:
    {quality_results}
    
    Dashboard: http://your-dashboard-url.com
    DBT Docs: http://your-dbt-docs-url.com
    """
    
    print("✅ Pipeline completed successfully!")
    print(message)
    
    return message

success_notification = PythonOperator(
    task_id='success_notification',
    python_callable=send_success_notification,
    dag=dag
)

# Define task dependencies
health_check >> start_logstash >> monitor_logstash >> dbt_staging >> dbt_test_staging
dbt_test_staging >> dbt_marts >> dbt_test_marts >> data_quality_check
data_quality_check >> dbt_docs >> success_notification

2.2 Incremental Loading DAG

Create dags/adventureworks_incremental_etl.py:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'data-engineering-team',
    'depends_on_past': True,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'adventureworks_incremental_etl',
    default_args=default_args,
    description='Incremental AdventureWorks ETL for real-time updates',
    schedule_interval=timedelta(hours=4),  # Every 4 hours
    max_active_runs=1,
    tags=['etl', 'incremental', 'adventureworks']
)

# Incremental data loading with change data capture
incremental_load = BashOperator(
    task_id='incremental_data_load',
    bash_command="""
    cd {{ var.value.dbt_project_dir }} &&
    dbt run --models +fact_sale --target prod --vars '{"is_incremental": true}'
    """,
    dag=dag
)

# Quick data quality check for incremental loads
def quick_quality_check():
    import clickhouse_driver
    
    client = clickhouse_driver.Client(host='clickhouse', port=9000, user='admin', password='clickhouse123')
    
    # Check for recent data
    query = """
    SELECT COUNT(*) FROM gold_layer.fact_sale 
    WHERE created_at >= NOW() - INTERVAL 4 HOUR
    """
    
    result = client.execute(query)
    count = result[0][0]
    
    print(f"Incremental load: {count} new records processed")
    return count

quality_check = PythonOperator(
    task_id='quick_quality_check',
    python_callable=quick_quality_check,
    dag=dag
)

incremental_load >> quality_check

Step 3: Configuration Management

3.1 Airflow Variables

Set up Airflow variables via Web UI or CLI:

# Database connections
airflow variables set mysql_host "mysql"
airflow variables set mysql_password "password"
airflow variables set clickhouse_host "clickhouse"
airflow variables set clickhouse_password "clickhouse123"

# Project paths
airflow variables set dbt_project_dir "/opt/airflow/dbt/adventureworks_analytics"

# Email notifications
airflow variables set alert_email "[email protected]"

3.2 Connections

Create Airflow connections for database access:

# ClickHouse connection
airflow connections add 'clickhouse_default' \
    --conn-type 'HTTP' \
    --conn-host 'clickhouse' \
    --conn-port 8123 \
    --conn-login 'admin' \
    --conn-password 'clickhouse123'

# MySQL connection  
airflow connections add 'mysql_default' \
    --conn-type 'mysql' \
    --conn-host 'mysql' \
    --conn-port 3306 \
    --conn-login 'root' \
    --conn-password 'password' \
    --conn-schema 'adventureworks'

Step 4: Deployment Process

4.1 Pre-deployment Checklist

# Create deployment script: deploy.sh
#!/bin/bash

echo "🚀 Starting AdventureWorks ETL Deployment..."

# 1. Validate DAG syntax
echo "📋 Validating DAG syntax..."
python -m py_compile dags/adventureworks_etl_pipeline.py
python -m py_compile dags/adventureworks_incremental_etl.py

# 2. Test DBT models
echo "🧪 Testing DBT models..."
cd dbt/adventureworks_analytics
dbt debug --target prod
dbt compile --target prod

# 3. Validate LogStash configuration
echo "⚙️ Validating LogStash configuration..."
docker run --rm -v $(pwd)/docs/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  docker.elastic.co/logstash/logstash:8.11.0 \
  logstash --config.test_and_exit

# 4. Check service connectivity
echo "🔗 Testing service connectivity..."
docker-compose exec airflow-webserver python -c "
import clickhouse_driver
client = clickhouse_driver.Client(host='clickhouse', port=9000)
print('ClickHouse connection: OK')
"

echo "✅ Pre-deployment validation complete!"

4.2 Production Deployment

# Make deploy script executable
chmod +x deploy.sh

# Run pre-deployment validation
./deploy.sh

# Deploy to production
docker-compose up -d

# Verify deployment
docker-compose ps
docker-compose logs airflow-scheduler

# Enable DAGs
airflow dags unpause adventureworks_etl_pipeline
airflow dags unpause adventureworks_incremental_etl

Step 5: Monitoring and Alerting

5.1 Set up Airflow Monitoring

# Create monitoring DAG: dags/pipeline_monitoring.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

def check_pipeline_health():
    from airflow.models import DagRun
    from datetime import datetime, timedelta
    
    # Check if main pipeline ran successfully in last 24 hours
    recent_runs = DagRun.find(
        dag_id='adventureworks_etl_pipeline',
        execution_start_date=datetime.now() - timedelta(days=1)
    )
    
    successful_runs = [run for run in recent_runs if run.state == 'success']
    
    if not successful_runs:
        raise Exception("No successful pipeline runs in the last 24 hours!")
    
    return f"✅ Pipeline health OK. Last successful run: {successful_runs[-1].execution_date}"

monitoring_dag = DAG(
    'pipeline_health_monitoring',
    default_args={'start_date': datetime(2025, 1, 1)},
    schedule_interval=timedelta(hours=6),
    catchup=False
)

health_check = PythonOperator(
    task_id='check_pipeline_health',
    python_callable=check_pipeline_health,
    dag=monitoring_dag
)

5.2 Alerting Configuration

Add to airflow.cfg:

[email]
email_backend = airflow.providers.sendgrid.utils.emailer.send_email
email_conn_id = sendgrid_default

[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [email protected]
smtp_password = your-app-password
smtp_port = 587
smtp_mail_from = [email protected]

Step 6: Performance Optimization

6.1 Resource Allocation

# Update docker-compose.yml with resource limits
services:
  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '2.0'
        reservations:
          memory: 1G
          cpus: '1.0'

6.2 Parallel Execution

# Update DAG for parallel execution
dag = DAG(
    'adventureworks_etl_pipeline',
    default_args=default_args,
    max_active_runs=1,
    max_active_tasks=10,  # Allow parallel task execution
    concurrency=16
)

Step 7: Backup and Recovery

7.1 Automated Backups

# Create backup script: backup.sh
#!/bin/bash

DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/opt/airflow/backups"

# Backup Airflow metadata
docker-compose exec postgres pg_dump -U airflow airflow > "$BACKUP_DIR/airflow_metadata_$DATE.sql"

# Backup DBT project
tar -czf "$BACKUP_DIR/dbt_project_$DATE.tar.gz" dbt/

# Backup DAGs
tar -czf "$BACKUP_DIR/dags_$DATE.tar.gz" dags/

echo "✅ Backup completed: $DATE"

7.2 Disaster Recovery Plan

  1. Data Recovery: Restore from ClickHouse backups

  2. Pipeline Recovery: Redeploy from version control

  3. State Recovery: Restore Airflow metadata database

  4. Validation: Run data quality checks post-recovery

Step 8: Security Best Practices

  1. Secrets Management: Use Airflow's secret backend

  2. Access Control: Implement RBAC for Airflow users

  3. Network Security: Use VPN/private networks

  4. Encryption: Enable SSL/TLS for all connections

  5. Audit Logging: Monitor all pipeline activities

Step 9: Documentation and Handover

  1. Runbook: Document troubleshooting procedures

  2. Architecture Diagrams: Maintain up-to-date system diagrams

  3. Change Management: Document all configuration changes

  4. Training: Provide team training on pipeline operations

Last updated

Was this helpful?