Documentations
User guide for developer to getting started with begin using Data Engineer flow on Data suite
Document outlines
The overrall architect of the sysmte
Getting started
Installation using docker
Explain the architecture of the local stack
Step by step installation using individual containers
Installtation using Docker-compose
Write ETL code using DBT
The overral data architeecture
Step by step write DBT code
Execute the code and output the data
Architect
Overview
@startuml
database DataSource
component LogStash
component Airflow {
component EtlJob
}
database Clickhouse {
component BronzeLayer
component GoldLayer
}
DataSource <- LogStash: read and query data from data source
LogStash -> BronzeLayer: push data to clickhouse
EtlJob --> BronzeLayer: read raw data from clickhouse
EtlJob --> GoldLayer: persist data aftere ETL to clickhouse
@enduml
Verification Steps
Check MySQL:
docker exec mysql mysql -uroot -ppassword -e "SHOW DATABASES;"
Check ClickHouse:
curl -u admin:clickhouse123 http://localhost:8123/ping
Check LogStash:
curl http://localhost:5044
Check Airflow: Visit
http://localhost:8080
(admin/admin)
Components
Data Source
External data sources that provide raw data for processing
LogStash
Data collection engine that reads and queries data from various sources
Airflow
Workflow orchestration platform containing ETL jobs
EtlJob
Data transformation jobs that process raw data
Clickhouse
Columnar database for analytical workloads
BronzeLayer
Raw data storage layer in ClickHouse
GoldLayer
Processed and refined data storage layer in ClickHouse
Getting started
Installation for local development using docker
Prerequisites
Docker and Docker Compose installed
Git for cloning repositories
8GB+ RAM recommended for full stack
Option 1: Individual Container Setup
Install MySQL database
# Create network for container communication
$ docker network create datasuite-network
# Run MySQL with persistent storage
$ docker run --name mysql \
--network datasuite-network \
-e MYSQL_ROOT_PASSWORD=password \
-e MYSQL_DATABASE=adventureworks \
-p 3306:3306 \
-v mysql-data:/var/lib/mysql \
-d mysql:8.0
Seed Adventure Works data into MySQL
# Load Adventure Works 2019 data from local file
$ docker exec -i mysql mysql -uroot -ppassword adventureworks < docs/data/AdventureWorks2019.sql
# Verify data was loaded successfully
$ docker exec mysql mysql -uroot -ppassword adventureworks -e "SHOW TABLES;"
$ docker exec mysql mysql -uroot -ppassword adventureworks -e "SELECT COUNT(*) as table_count FROM information_schema.tables WHERE table_schema='adventureworks';"
Adventure Works Data Structure:
Sales
schema: Customer orders, products, sales data
class Sales_SalesOrderDetail
class Sales_SaleOrderHeader
class Sales_Customer
class Sales_SalesTerritory
Sales_SaleOrderHeader --> Sales_SalesOrderDetail
Sales_SaleOrderHeader -> Sales_Customer
Sales_SaleOrderHeader -> Sales_SalesTerritory
Production
schema: Product catalog, inventoryPerson
schema: Customer and employee informationSample query:
SELECT COUNT(*) FROM Sales.SalesOrderHeader;
Install ClickHouse data warehouse
$ docker run --name clickhouse \
--network datasuite-network \
-p 8123:8123 \
-p 9000:9000 \
-e CLICKHOUSE_USER=admin \
-e CLICKHOUSE_PASSWORD=clickhouse123 \
-e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \
-v clickhouse-data:/var/lib/clickhouse \
-d clickhouse/clickhouse-server:latest
# Verify ClickHouse is running with authentication
$ curl -u admin:clickhouse123 http://localhost:8123/ping
# Test connection and create database
$ curl -u admin:clickhouse123 http://localhost:8123/ -d "CREATE DATABASE IF NOT EXISTS bronze_layer"
$ curl -u admin:clickhouse123 http://localhost:8123/ -d "CREATE DATABASE IF NOT EXISTS gold_layer"
Install LogStash with MySQL connector
# Create directory for MySQL connector
$ mkdir -p logstash-drivers
# Download MySQL JDBC driver using Docker container
$ docker run --rm -v $(pwd)/logstash-drivers:/drivers alpine:latest \
sh -c "apk add --no-cache wget && \
wget -O /drivers/mysql-connector-java.jar \
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.9-rc/mysql-connector-java-8.0.9-rc.jar"
# Run LogStash with driver mounted
$ docker run --name logstash \
--network datasuite-network \
-p 5044:5044 \
-v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-v $(pwd)/logstash-drivers:/usr/share/logstash/drivers \
-d docker.elastic.co/logstash/logstash:8.11.0
Sample LogStash Configuration (logstash.conf
):
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
jdbc_user => "root"
jdbc_password => "password"
statement => "SELECT * FROM Sales.SalesOrderHeader WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate"
use_column_value => true
tracking_column => "ModifiedDate"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
clean_run => false
schedule => "*/30 * * * * *"
}
}
filter {
# Convert to proper JSON format for ClickHouse
mutate {
remove_field => ["@version", "@timestamp"]
}
}
output {
http {
url => "http://clickhouse:8123/"
http_method => "post"
format => "message"
message => "INSERT INTO bronze_layer.sales_orders FORMAT JSONEachRow %{message}"
headers => {
"Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw==" # admin:clickhouse123 base64
"Content-Type" => "application/json"
}
}
# Debug output to console
stdout {
codec => rubydebug
}
}
Option 2: Docker Compose Setup
Create docker-compose.yml
:
version: '3.8'
services:
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: password
MYSQL_DATABASE: adventureworks
ports:
- "3306:3306"
volumes:
- mysql-data:/var/lib/mysql
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123"
- "9000:9000"
environment:
CLICKHOUSE_USER: admin
CLICKHOUSE_PASSWORD: clickhouse123
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
volumes:
- clickhouse-data:/var/lib/clickhouse
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
ports:
- "5044:5044"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
- ./logstash-drivers:/usr/share/logstash/drivers
depends_on:
- mysql
- clickhouse
airflow:
image: apache/airflow:2.7.0
ports:
- "8080:8080"
volumes:
- ./dags:/opt/airflow/dags
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
depends_on:
- clickhouse
volumes:
mysql-data:
clickhouse-data:
Download MySQL JDBC driver and start all services:
# Create directory and download MySQL connector using Docker
$ mkdir -p logstash-drivers
$ docker run --rm -v $(pwd)/logstash-drivers:/drivers alpine:latest \
sh -c "apk add --no-cache wget && \
wget -O /drivers/mysql-connector-java.jar \
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.9-rc/mysql-connector-java-8.0.9-rc.jar"
# Start all services
$ docker-compose up -d
Expose ClickHouse securely via tunneling
Method 1: SSH Tunnel
# Create SSH tunnel to remote server
$ ssh -L 8123:localhost:8123 user@remote-server
Method 2: ngrok (for development/testing)
# Install ngrok
$ npm install -g ngrok
# Create secure tunnel for ClickHouse HTTP interface
$ ngrok tcp 8123
# Access via provided ngrok URL (e.g., 0.tcp.ngrok.io:12345)
Method 3: ngrok with authentication
# Create tunnel with basic auth
$ ngrok http 8123 --auth="username:password"
Security Best Practices:
Use IP whitelisting in ngrok dashboard
Enable ClickHouse authentication:
users.xml
configurationUse HTTPS/TLS connections for production
Rotate access credentials regularly
Monitor access logs for suspicious activity
Write DBT code to ETL code from data warehouse bronze layer to gold layer
Step 2: Dimensional Model for Sale Processes
Overview
fact_sale
x
x
x
Original table ERD Structure
Table Explanations
Fact Table
fact_sale
Central fact table for sales transactions
Contains measurable sales events with foreign keys to dimension tables. Each row represents a sales order line item with associated quantities, amounts, and dimensional context.
• OrderQty (quantity sold) • UnitPrice (price per unit) • LineTotal (extended price) • SubTotal (order subtotal) • TaxAmt (tax amount) • TotalDue (final amount)
One row per sales order detail line item
Dimension Tables
dim_date
Time dimension
Provides temporal context for sales analysis with pre-computed date attributes for easy filtering and grouping by various time periods.
• Date (primary date) • Year, Quarter, Month, Day • DayOfWeek, WeekOfYear • IsWeekend, IsHoliday • FiscalYear, FiscalQuarter
Type 1 SCD (Static)
dim_territory
Geographic dimension
Contains sales territory information for regional analysis and geographic reporting. Supports hierarchical geographic analysis from region to country.
• TerritoryID (business key) • TerritoryName • CountryRegionCode • TerritoryGroup • RegionName
Type 1 SCD
dim_product
Product dimension
Product catalog information enabling product-based analysis. Contains current and historical product attributes for trend analysis.
• ProductID (business key) • ProductName • ProductNumber • ProductCategory • ProductSubcategory • StandardCost, ListPrice • ProductLine, ProductModel
Type 2 SCD (Historical)
Star Schema Relationships
@startuml
!theme plain
entity fact_sale {
* sale_key : bigint <<PK>>
--
* date_key : int <<FK>>
* territory_key : int <<FK>>
* product_key : int <<FK>>
--
order_qty : decimal
unit_price : decimal
line_total : decimal
sub_total : decimal
tax_amt : decimal
total_due : decimal
sales_order_id : int
}
entity dim_date {
* date_key : int <<PK>>
--
date : date
year : int
quarter : int
month : int
day : int
day_of_week : int
week_of_year : int
is_weekend : boolean
fiscal_year : int
fiscal_quarter : int
}
entity dim_territory {
* territory_key : int <<PK>>
--
territory_id : int
territory_name : varchar(50)
country_region_code : varchar(3)
territory_group : varchar(50)
region_name : varchar(50)
}
entity dim_product {
* product_key : int <<PK>>
--
product_id : int
product_name : varchar(50)
product_number : varchar(25)
product_category : varchar(50)
product_subcategory : varchar(50)
standard_cost : decimal
list_price : decimal
is_current : boolean
}
dim_date ||--o{ fact_sale
dim_territory ||--o{ fact_sale
dim_product ||--o{ fact_sale
note right of fact_sale : Grain: One row per sales order line item\nVolume: ~500K rows/month
note top of dim_date : Type 1 SCD - Pre-built calendar
note left of dim_territory : Type 1 SCD - Geographic data
note bottom of dim_product : Type 2 SCD - Historical tracking
@enduml
Data Sources and Lineage
fact_sale
Sales_SalesOrderHeader
Sales_SalesOrderDetail
Joined on SalesOrderID, aggregated by line item
dim_date
Date functions/calendar table
Generated programmatically with business calendar rules
dim_territory
Sales_SalesTerritory
Direct mapping with cleansing and standardization
dim_product
Production_Product
Production_ProductCategory
Production_ProductSubcategory
Denormalized product hierarchy for performance
DBT Project Setup Steps
Step 1: Environment Setup
# Install DBT Core
pip install dbt-core
# Install ClickHouse adapter (for our data warehouse)
pip install dbt-clickhouse
# Verify installation
dbt --version
Step 2: Create New DBT Project
# Create new DBT project
dbt init adventureworks_analytics
# Navigate to project directory
cd adventureworks_analytics
Step 3: Configure Database Connection
Update profiles.yml
file (usually located in ~/.dbt/profiles.yml
):
adventureworks_analytics:
target: dev
outputs:
dev:
type: clickhouse
host: localhost
port: 8123
user: admin
password: clickhouse123
database: analytics
schema: gold_layer
threads: 4
keepalives_idle: 0
search_path: bronze_layer,gold_layer
prod:
type: clickhouse
host: "{{ env_var('CLICKHOUSE_HOST') }}"
port: 8123
user: "{{ env_var('CLICKHOUSE_USER') }}"
password: "{{ env_var('CLICKHOUSE_PASSWORD') }}"
database: analytics
schema: gold_layer
threads: 8
Step 4: Install Custom Data Suite Adapter (Optional)
# Add to packages.yml in your DBT project root
echo "packages:
- git: https://gitlab.fci.vn/datasuite/data-suite-data-engineer/adapter_oracle.git
revision: 1.0.8" > packages.yml
# Install packages
dbt deps
Step 5: Configure Project Structure
Update dbt_project.yml
:
name: 'adventureworks_analytics'
version: '1.0.0'
config-version: 2
profile: 'adventureworks_analytics'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
adventureworks_analytics:
# Bronze layer (raw data staging)
staging:
+materialized: view
+schema: bronze_layer
# Gold layer (dimensional model)
marts:
+materialized: table
+schema: gold_layer
# Data quality tests
tests:
+severity: error
Step 6: Create Layer Structure
# Create model directories
mkdir -p models/staging
mkdir -p models/marts/core
mkdir -p models/marts/finance
mkdir -p macros
mkdir -p tests
# Create source definitions
touch models/staging/sources.yml
touch models/staging/staging.yml
Step 7: Define Data Sources
Create models/staging/sources.yml
:
version: 2
sources:
- name: bronze_layer
description: "Raw data from LogStash ETL pipeline"
tables:
- name: sales_orders
description: "Sales order header information"
columns:
- name: salesorderid
description: "Primary key for sales order"
tests:
- unique
- not_null
- name: sales_order_details
description: "Sales order line item details"
- name: customers
description: "Customer master data"
- name: products
description: "Product catalog information"
Step 8: Create Staging Models
Create staging models to clean and standardize bronze layer data:
-- models/staging/stg_sales_orders.sql
{{ config(materialized='view') }}
SELECT
salesorderid,
customerid,
territoryid,
orderdate,
duedate,
shipdate,
status,
subtotal,
taxamt,
freight,
totaldue,
modifieddate
FROM {{ source('bronze_layer', 'sales_orders') }}
WHERE salesorderid IS NOT NULL
Step 9: Build Dimensional Models
Create dimensional model files:
models/marts/core/dim_date.sql
models/marts/core/dim_territory.sql
models/marts/core/dim_product.sql
models/marts/core/fact_sale.sql
Step 10: Run DBT Pipeline
# Test connections
dbt debug
# Run staging models
dbt run --models staging
# Run dimensional models
dbt run --models marts
# Run all tests
dbt test
# Generate documentation
dbt docs generate
dbt docs serve
Step 11: Schedule with Airflow (Optional)
Create DAG for automated runs:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
dag = DAG(
'adventureworks_dbt_pipeline',
default_args={'retries': 1},
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2025, 1, 1)
)
dbt_run = BashOperator(
task_id='dbt_run',
bash_command='cd /path/to/adventureworks_analytics && dbt run',
dag=dag
)
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /path/to/adventureworks_analytics && dbt test',
dag=dag
)
dbt_run >> dbt_test
Step 12: Data Quality & Monitoring
Set up data quality tests in
tests/
directoryConfigure alerting for test failures
Monitor pipeline performance and data freshness
Set up incremental model strategies for large tables
Create and deploy code ETL to Airflow (production)
Overview
This section covers deploying the complete AdventureWorks ETL pipeline to Apache Airflow for production use, including LogStash data ingestion, DBT transformations, and monitoring.
Production Architecture
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MySQL │ │ LogStash │ │ ClickHouse │ │ DBT │
│ (Adventure │────│ (Data Ingestion)│────│ (Data Warehouse│────│ (Transformations│
│ Works Source) │ │ │ │ Bronze Layer) │ │ Gold Layer) │
└─────────────────┘ └──────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└─────────────────────────┼─────────────────────────┘
│
┌─────────────────┐
│ Apache │
│ Airflow │
│ (Orchestration) │
└─────────────────┘
Prerequisites
Docker and Docker Compose installed
Apache Airflow 2.7+ running
ClickHouse database accessible
DBT project configured (from previous section)
LogStash configuration files ready
Step 1: Airflow Environment Setup
1.1 Docker Compose Configuration
Create docker-compose.yml
for Airflow production setup:
version: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.7.0
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# Environment variables for our ETL pipeline
MYSQL_HOST: mysql
MYSQL_USER: root
MYSQL_PASSWORD: password
CLICKHOUSE_HOST: clickhouse
CLICKHOUSE_USER: admin
CLICKHOUSE_PASSWORD: clickhouse123
DBT_PROJECT_DIR: /opt/airflow/dbt/adventureworks_analytics
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/dbt:/opt/airflow/dbt
- ./docs/logstash.conf:/opt/airflow/config/logstash.conf
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
timeout: 5s
retries: 5
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
volumes:
postgres-db-volume:
1.2 Install Required Packages
Create requirements.txt
for additional Python packages:
dbt-core==1.6.0
dbt-clickhouse==1.5.0
psycopg2-binary==2.9.7
clickhouse-driver==0.2.6
apache-airflow-providers-docker==3.7.2
apache-airflow-providers-ssh==3.7.0
Build custom Airflow image with requirements:
FROM apache/airflow:2.7.0
COPY requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt
Step 2: Create Production DAGs
2.1 Main ETL Pipeline DAG
Create dags/adventureworks_etl_pipeline.py
:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.docker_operator import DockerOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
# Default arguments for all tasks
default_args = {
'owner': 'data-engineering-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'catchup': False
}
# Define the DAG
dag = DAG(
'adventureworks_etl_pipeline',
default_args=default_args,
description='Complete AdventureWorks ETL Pipeline',
schedule_interval='0 2 * * *', # Daily at 2 AM
max_active_runs=1,
tags=['etl', 'adventureworks', 'production']
)
# Task 1: Health Check - Verify all services are running
def check_services_health():
import requests
import time
services = {
'ClickHouse': 'http://clickhouse:8123/ping',
'MySQL': 'mysql:3306' # Will be checked differently
}
for service, endpoint in services.items():
if service == 'ClickHouse':
try:
response = requests.get(endpoint, timeout=10)
if response.status_code != 200:
raise Exception(f"{service} health check failed")
print(f"✅ {service} is healthy")
except Exception as e:
raise Exception(f"❌ {service} health check failed: {str(e)}")
return "All services healthy"
health_check = PythonOperator(
task_id='health_check',
python_callable=check_services_health,
dag=dag
)
# Task 2: Start LogStash Data Ingestion
start_logstash = DockerOperator(
task_id='start_logstash_ingestion',
image='docker.elastic.co/logstash/logstash:8.11.0',
container_name='logstash-etl-{{ ds }}',
api_version='auto',
auto_remove=True,
environment={
'MYSQL_HOST': '{{ var.value.mysql_host }}',
'MYSQL_PASSWORD': '{{ var.value.mysql_password }}',
'CLICKHOUSE_HOST': '{{ var.value.clickhouse_host }}',
'CLICKHOUSE_PASSWORD': '{{ var.value.clickhouse_password }}'
},
volumes=[
'/opt/airflow/config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro',
'/opt/airflow/drivers:/usr/share/logstash/drivers:ro',
'/opt/airflow/sql-queries:/usr/share/logstash/sql-queries:ro'
],
network_mode='bridge',
dag=dag
)
# Task 3: Monitor LogStash Completion
def monitor_logstash_completion():
import time
import clickhouse_driver
client = clickhouse_driver.Client(
host='clickhouse',
port=9000,
user='admin',
password='clickhouse123'
)
# Check if data was loaded in the last hour
query = """
SELECT COUNT(*) as record_count
FROM bronze_layer.sales_orders
WHERE pipeline_timestamp >= NOW() - INTERVAL 1 HOUR
"""
result = client.execute(query)
record_count = result[0][0]
if record_count == 0:
raise Exception("No new records found in bronze layer")
print(f"✅ LogStash loaded {record_count} records successfully")
return record_count
monitor_logstash = PythonOperator(
task_id='monitor_logstash_completion',
python_callable=monitor_logstash_completion,
dag=dag
)
# Task 4: Run DBT Staging Models
dbt_staging = BashOperator(
task_id='dbt_run_staging',
bash_command="""
cd {{ var.value.dbt_project_dir }} &&
dbt run --models staging --target prod
""",
dag=dag
)
# Task 5: Run DBT Tests on Staging
dbt_test_staging = BashOperator(
task_id='dbt_test_staging',
bash_command="""
cd {{ var.value.dbt_project_dir }} &&
dbt test --models staging --target prod
""",
dag=dag
)
# Task 6: Run DBT Dimensional Models
dbt_marts = BashOperator(
task_id='dbt_run_marts',
bash_command="""
cd {{ var.value.dbt_project_dir }} &&
dbt run --models marts --target prod
""",
dag=dag
)
# Task 7: Run DBT Tests on Marts
dbt_test_marts = BashOperator(
task_id='dbt_test_marts',
bash_command="""
cd {{ var.value.dbt_project_dir }} &&
dbt test --models marts --target prod
""",
dag=dag
)
# Task 8: Data Quality Validation
def validate_data_quality():
import clickhouse_driver
client = clickhouse_driver.Client(
host='clickhouse',
port=9000,
user='admin',
password='clickhouse123'
)
# Quality checks
checks = [
{
'name': 'Fact table record count',
'query': 'SELECT COUNT(*) FROM gold_layer.fact_sale',
'min_expected': 1000
},
{
'name': 'Date dimension completeness',
'query': 'SELECT COUNT(*) FROM gold_layer.dim_date',
'min_expected': 365
},
{
'name': 'Product dimension completeness',
'query': 'SELECT COUNT(*) FROM gold_layer.dim_product WHERE is_current = 1',
'min_expected': 100
}
]
results = {}
for check in checks:
result = client.execute(check['query'])
count = result[0][0]
if count < check['min_expected']:
raise Exception(f"Data quality check failed: {check['name']} - Expected: {check['min_expected']}, Got: {count}")
results[check['name']] = count
print(f"✅ {check['name']}: {count} records")
return results
data_quality_check = PythonOperator(
task_id='data_quality_validation',
python_callable=validate_data_quality,
dag=dag
)
# Task 9: Generate DBT Documentation
dbt_docs = BashOperator(
task_id='generate_dbt_docs',
bash_command="""
cd {{ var.value.dbt_project_dir }} &&
dbt docs generate --target prod &&
echo "DBT documentation generated successfully"
""",
dag=dag
)
# Task 10: Pipeline Success Notification
def send_success_notification(**context):
from airflow.providers.email.operators.email import EmailOperator
execution_date = context['execution_date']
task_instance = context['task_instance']
# Get data quality results from previous task
quality_results = task_instance.xcom_pull(task_ids='data_quality_validation')
message = f"""
AdventureWorks ETL Pipeline completed successfully!
Execution Date: {execution_date}
Data Quality Results:
{quality_results}
Dashboard: http://your-dashboard-url.com
DBT Docs: http://your-dbt-docs-url.com
"""
print("✅ Pipeline completed successfully!")
print(message)
return message
success_notification = PythonOperator(
task_id='success_notification',
python_callable=send_success_notification,
dag=dag
)
# Define task dependencies
health_check >> start_logstash >> monitor_logstash >> dbt_staging >> dbt_test_staging
dbt_test_staging >> dbt_marts >> dbt_test_marts >> data_quality_check
data_quality_check >> dbt_docs >> success_notification
2.2 Incremental Loading DAG
Create dags/adventureworks_incremental_etl.py
:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'data-engineering-team',
'depends_on_past': True,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'adventureworks_incremental_etl',
default_args=default_args,
description='Incremental AdventureWorks ETL for real-time updates',
schedule_interval=timedelta(hours=4), # Every 4 hours
max_active_runs=1,
tags=['etl', 'incremental', 'adventureworks']
)
# Incremental data loading with change data capture
incremental_load = BashOperator(
task_id='incremental_data_load',
bash_command="""
cd {{ var.value.dbt_project_dir }} &&
dbt run --models +fact_sale --target prod --vars '{"is_incremental": true}'
""",
dag=dag
)
# Quick data quality check for incremental loads
def quick_quality_check():
import clickhouse_driver
client = clickhouse_driver.Client(host='clickhouse', port=9000, user='admin', password='clickhouse123')
# Check for recent data
query = """
SELECT COUNT(*) FROM gold_layer.fact_sale
WHERE created_at >= NOW() - INTERVAL 4 HOUR
"""
result = client.execute(query)
count = result[0][0]
print(f"Incremental load: {count} new records processed")
return count
quality_check = PythonOperator(
task_id='quick_quality_check',
python_callable=quick_quality_check,
dag=dag
)
incremental_load >> quality_check
Step 3: Configuration Management
3.1 Airflow Variables
Set up Airflow variables via Web UI or CLI:
# Database connections
airflow variables set mysql_host "mysql"
airflow variables set mysql_password "password"
airflow variables set clickhouse_host "clickhouse"
airflow variables set clickhouse_password "clickhouse123"
# Project paths
airflow variables set dbt_project_dir "/opt/airflow/dbt/adventureworks_analytics"
# Email notifications
airflow variables set alert_email "[email protected]"
3.2 Connections
Create Airflow connections for database access:
# ClickHouse connection
airflow connections add 'clickhouse_default' \
--conn-type 'HTTP' \
--conn-host 'clickhouse' \
--conn-port 8123 \
--conn-login 'admin' \
--conn-password 'clickhouse123'
# MySQL connection
airflow connections add 'mysql_default' \
--conn-type 'mysql' \
--conn-host 'mysql' \
--conn-port 3306 \
--conn-login 'root' \
--conn-password 'password' \
--conn-schema 'adventureworks'
Step 4: Deployment Process
4.1 Pre-deployment Checklist
# Create deployment script: deploy.sh
#!/bin/bash
echo "🚀 Starting AdventureWorks ETL Deployment..."
# 1. Validate DAG syntax
echo "📋 Validating DAG syntax..."
python -m py_compile dags/adventureworks_etl_pipeline.py
python -m py_compile dags/adventureworks_incremental_etl.py
# 2. Test DBT models
echo "🧪 Testing DBT models..."
cd dbt/adventureworks_analytics
dbt debug --target prod
dbt compile --target prod
# 3. Validate LogStash configuration
echo "⚙️ Validating LogStash configuration..."
docker run --rm -v $(pwd)/docs/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
docker.elastic.co/logstash/logstash:8.11.0 \
logstash --config.test_and_exit
# 4. Check service connectivity
echo "🔗 Testing service connectivity..."
docker-compose exec airflow-webserver python -c "
import clickhouse_driver
client = clickhouse_driver.Client(host='clickhouse', port=9000)
print('ClickHouse connection: OK')
"
echo "✅ Pre-deployment validation complete!"
4.2 Production Deployment
# Make deploy script executable
chmod +x deploy.sh
# Run pre-deployment validation
./deploy.sh
# Deploy to production
docker-compose up -d
# Verify deployment
docker-compose ps
docker-compose logs airflow-scheduler
# Enable DAGs
airflow dags unpause adventureworks_etl_pipeline
airflow dags unpause adventureworks_incremental_etl
Step 5: Monitoring and Alerting
5.1 Set up Airflow Monitoring
# Create monitoring DAG: dags/pipeline_monitoring.py
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def check_pipeline_health():
from airflow.models import DagRun
from datetime import datetime, timedelta
# Check if main pipeline ran successfully in last 24 hours
recent_runs = DagRun.find(
dag_id='adventureworks_etl_pipeline',
execution_start_date=datetime.now() - timedelta(days=1)
)
successful_runs = [run for run in recent_runs if run.state == 'success']
if not successful_runs:
raise Exception("No successful pipeline runs in the last 24 hours!")
return f"✅ Pipeline health OK. Last successful run: {successful_runs[-1].execution_date}"
monitoring_dag = DAG(
'pipeline_health_monitoring',
default_args={'start_date': datetime(2025, 1, 1)},
schedule_interval=timedelta(hours=6),
catchup=False
)
health_check = PythonOperator(
task_id='check_pipeline_health',
python_callable=check_pipeline_health,
dag=monitoring_dag
)
5.2 Alerting Configuration
Add to airflow.cfg
:
[email]
email_backend = airflow.providers.sendgrid.utils.emailer.send_email
email_conn_id = sendgrid_default
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_user = [email protected]
smtp_password = your-app-password
smtp_port = 587
smtp_mail_from = [email protected]
Step 6: Performance Optimization
6.1 Resource Allocation
# Update docker-compose.yml with resource limits
services:
airflow-scheduler:
<<: *airflow-common
command: scheduler
deploy:
resources:
limits:
memory: 2G
cpus: '2.0'
reservations:
memory: 1G
cpus: '1.0'
6.2 Parallel Execution
# Update DAG for parallel execution
dag = DAG(
'adventureworks_etl_pipeline',
default_args=default_args,
max_active_runs=1,
max_active_tasks=10, # Allow parallel task execution
concurrency=16
)
Step 7: Backup and Recovery
7.1 Automated Backups
# Create backup script: backup.sh
#!/bin/bash
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/opt/airflow/backups"
# Backup Airflow metadata
docker-compose exec postgres pg_dump -U airflow airflow > "$BACKUP_DIR/airflow_metadata_$DATE.sql"
# Backup DBT project
tar -czf "$BACKUP_DIR/dbt_project_$DATE.tar.gz" dbt/
# Backup DAGs
tar -czf "$BACKUP_DIR/dags_$DATE.tar.gz" dags/
echo "✅ Backup completed: $DATE"
7.2 Disaster Recovery Plan
Data Recovery: Restore from ClickHouse backups
Pipeline Recovery: Redeploy from version control
State Recovery: Restore Airflow metadata database
Validation: Run data quality checks post-recovery
Step 8: Security Best Practices
Secrets Management: Use Airflow's secret backend
Access Control: Implement RBAC for Airflow users
Network Security: Use VPN/private networks
Encryption: Enable SSL/TLS for all connections
Audit Logging: Monitor all pipeline activities
Step 9: Documentation and Handover
Runbook: Document troubleshooting procedures
Architecture Diagrams: Maintain up-to-date system diagrams
Change Management: Document all configuration changes
Training: Provide team training on pipeline operations
Last updated
Was this helpful?