DataSuite ETL on Kubernetes - Production Infrastructure Guide
Overview
This documentation provides a comprehensive guide for deploying the complete AdventureWorks ETL pipeline on Kubernetes, including MySQL source database, LogStash data ingestion, ClickHouse data warehouse, DBT transformations, and Apache Airflow orchestration.
Kubernetes Architecture
@startuml
!theme plain
package "Kubernetes Cluster" {
package "Data Sources Namespace" {
[MySQL Pod] as mysql
[MySQL Service] as mysql_svc
[MySQL PVC] as mysql_pvc
}
package "Data Ingestion Namespace" {
[LogStash Pod] as logstash
[LogStash Service] as logstash_svc
[LogStash ConfigMap] as logstash_cm
}
package "Data Warehouse Namespace" {
[ClickHouse Pod] as clickhouse
[ClickHouse Service] as clickhouse_svc
[ClickHouse PVC] as clickhouse_pvc
}
package "Orchestration Namespace" {
[Airflow Webserver] as airflow_web
[Airflow Scheduler] as airflow_scheduler
[Airflow Worker] as airflow_worker
[PostgreSQL] as airflow_db
[Redis] as airflow_redis
}
package "Monitoring Namespace" {
[Prometheus] as prometheus
[Grafana] as grafana
[AlertManager] as alertmanager
}
}
mysql --> logstash : "Data Extract"
logstash --> clickhouse : "Data Load"
clickhouse --> airflow_scheduler : "DBT Transform"
prometheus --> grafana : "Metrics"
prometheus --> alertmanager : "Alerts"
@enduml
Prerequisites
Kubernetes cluster (v1.25+) with at least 16GB RAM and 8 CPU cores
kubectl configured with cluster access
Helm 3.x installed
Persistent Volume support (StorageClass configured)
LoadBalancer or Ingress controller for external access
Container registry access (Docker Hub or private registry)
Namespace Strategy
# Create namespaces for logical separation
kubectl create namespace data-sources
kubectl create namespace data-ingestion
kubectl create namespace data-warehouse
kubectl create namespace orchestration
kubectl create namespace monitoring
Step 1: MySQL Source Database Deployment
1.1 MySQL ConfigMap
Create mysql/mysql-configmap.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: mysql-config
namespace: data-sources
data:
my.cnf: |
[mysqld]
default-storage-engine=innodb
skip-host-cache
skip-name-resolve
datadir=/var/lib/mysql
socket=/var/run/mysqld/mysqld.sock
secure-file-priv=/var/lib/mysql-files
user=mysql
# Performance settings
innodb_buffer_pool_size=1G
innodb_log_file_size=256M
max_connections=200
# Logging
general_log=1
general_log_file=/var/lib/mysql/general.log
slow_query_log=1
slow_query_log_file=/var/lib/mysql/slow.log
long_query_time=2
1.2 MySQL Secret
Create mysql/mysql-secret.yaml
:
apiVersion: v1
kind: Secret
metadata:
name: mysql-secret
namespace: data-sources
type: Opaque
data:
MYSQL_ROOT_PASSWORD: cGFzc3dvcmQ= # base64 encoded 'password'
MYSQL_DATABASE: YWR2ZW50dXJld29ya3M= # base64 encoded 'adventureworks'
1.3 MySQL Persistent Volume
Create mysql/mysql-pvc.yaml
:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mysql-pvc
namespace: data-sources
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20Gi
storageClassName: standard
1.4 MySQL Deployment
Create mysql/mysql-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
namespace: data-sources
labels:
app: mysql
tier: database
spec:
replicas: 1
selector:
matchLabels:
app: mysql
template:
metadata:
labels:
app: mysql
spec:
containers:
- name: mysql
image: mysql:8.0
env:
- name: MYSQL_ROOT_PASSWORD
valueFrom:
secretKeyRef:
name: mysql-secret
key: MYSQL_ROOT_PASSWORD
- name: MYSQL_DATABASE
valueFrom:
secretKeyRef:
name: mysql-secret
key: MYSQL_DATABASE
ports:
- containerPort: 3306
name: mysql
volumeMounts:
- name: mysql-storage
mountPath: /var/lib/mysql
- name: mysql-config
mountPath: /etc/mysql/conf.d
- name: mysql-initdb
mountPath: /docker-entrypoint-initdb.d
resources:
requests:
memory: "1Gi"
cpu: "0.5"
limits:
memory: "2Gi"
cpu: "1"
livenessProbe:
exec:
command:
- mysqladmin
- ping
- -h
- localhost
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
exec:
command:
- mysql
- -h
- localhost
- -e
- SELECT 1
initialDelaySeconds: 5
periodSeconds: 2
volumes:
- name: mysql-storage
persistentVolumeClaim:
claimName: mysql-pvc
- name: mysql-config
configMap:
name: mysql-config
- name: mysql-initdb
configMap:
name: mysql-initdb
1.5 MySQL Service
Create mysql/mysql-service.yaml
:
apiVersion: v1
kind: Service
metadata:
name: mysql-service
namespace: data-sources
labels:
app: mysql
spec:
ports:
- port: 3306
targetPort: 3306
protocol: TCP
selector:
app: mysql
type: ClusterIP
1.6 MySQL Data Initialization
Create mysql/mysql-initdb-configmap.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: mysql-initdb
namespace: data-sources
data:
01-create-schema.sql: |
CREATE DATABASE IF NOT EXISTS adventureworks;
USE adventureworks;
-- Grant permissions for LogStash access
CREATE USER 'logstash'@'%' IDENTIFIED BY 'logstash123';
GRANT SELECT ON adventureworks.* TO 'logstash'@'%';
FLUSH PRIVILEGES;
02-load-adventureworks.sql: |
-- This file should contain the AdventureWorks sample data
-- For production, mount the actual AdventureWorks SQL file
USE adventureworks;
-- Sample table creation (replace with actual AdventureWorks schema)
CREATE TABLE IF NOT EXISTS Sales_SalesOrderHeader (
SalesOrderID INT PRIMARY KEY,
CustomerID INT,
TerritoryID INT,
OrderDate DATETIME,
DueDate DATETIME,
ShipDate DATETIME,
Status TINYINT,
SubTotal DECIMAL(19,4),
TaxAmt DECIMAL(19,4),
Freight DECIMAL(19,4),
TotalDue DECIMAL(19,4),
ModifiedDate DATETIME
);
Step 2: ClickHouse Data Warehouse Deployment
2.1 ClickHouse ConfigMap
Create clickhouse/clickhouse-config.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: clickhouse-config
namespace: data-warehouse
data:
config.xml: |
<?xml version="1.0"?>
<yandex>
<logger>
<level>information</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<mysql_port>9004</mysql_port>
<postgresql_port>9005</postgresql_port>
<listen_host>0.0.0.0</listen_host>
<max_connections>200</max_connections>
<keep_alive_timeout>3</keep_alive_timeout>
<max_concurrent_queries>100</max_concurrent_queries>
<path>/var/lib/clickhouse/</path>
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
<users_config>users.xml</users_config>
<default_profile>default</default_profile>
<default_database>default</default_database>
<timezone>UTC</timezone>
<mlock_executable>false</mlock_executable>
<remote_servers>
<cluster_1>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
</shard>
</cluster_1>
</remote_servers>
</yandex>
users.xml: |
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<max_memory_usage>10000000000</max_memory_usage>
<use_uncompressed_cache>0</use_uncompressed_cache>
<load_balancing>random</load_balancing>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<admin>
<password>clickhouse123</password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</admin>
</users>
<quotas>
<default>
<interval>
<duration>3600</duration>
<queries>0</queries>
<errors>0</errors>
<result_rows>0</result_rows>
<read_rows>0</read_rows>
<execution_time>0</execution_time>
</interval>
</default>
</quotas>
</yandex>
init-db.sql: |
-- Create databases and schemas
CREATE DATABASE IF NOT EXISTS bronze_layer;
CREATE DATABASE IF NOT EXISTS gold_layer;
-- Bronze layer tables
CREATE TABLE IF NOT EXISTS bronze_layer.sales_orders (
salesorderid Int32,
customerid Int32,
territoryid Int32,
orderdate DateTime,
duedate DateTime,
shipdate DateTime,
status UInt8,
subtotal Decimal(19,4),
taxamt Decimal(19,4),
freight Decimal(19,4),
totaldue Decimal(19,4),
modifieddate DateTime,
pipeline_timestamp DateTime DEFAULT now(),
source_system String DEFAULT 'adventureworks_mysql'
) ENGINE = MergeTree()
ORDER BY (salesorderid, orderdate)
PARTITION BY toYYYYMM(orderdate);
2.2 ClickHouse Deployment
Create clickhouse/clickhouse-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: clickhouse
namespace: data-warehouse
labels:
app: clickhouse
tier: analytics
spec:
replicas: 1
selector:
matchLabels:
app: clickhouse
template:
metadata:
labels:
app: clickhouse
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:23.8-alpine
ports:
- containerPort: 8123
name: http
- containerPort: 9000
name: native
- containerPort: 9004
name: mysql
- containerPort: 9005
name: postgres
volumeMounts:
- name: clickhouse-storage
mountPath: /var/lib/clickhouse
- name: clickhouse-config
mountPath: /etc/clickhouse-server/config.xml
subPath: config.xml
- name: clickhouse-config
mountPath: /etc/clickhouse-server/users.xml
subPath: users.xml
- name: clickhouse-config
mountPath: /docker-entrypoint-initdb.d/init-db.sql
subPath: init-db.sql
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
livenessProbe:
httpGet:
path: /ping
port: 8123
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ping
port: 8123
initialDelaySeconds: 5
periodSeconds: 5
volumes:
- name: clickhouse-storage
persistentVolumeClaim:
claimName: clickhouse-pvc
- name: clickhouse-config
configMap:
name: clickhouse-config
2.3 ClickHouse PVC and Service
Create clickhouse/clickhouse-pvc.yaml
:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: clickhouse-pvc
namespace: data-warehouse
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 50Gi
storageClassName: standard
Create clickhouse/clickhouse-service.yaml
:
apiVersion: v1
kind: Service
metadata:
name: clickhouse-service
namespace: data-warehouse
labels:
app: clickhouse
spec:
ports:
- port: 8123
targetPort: 8123
protocol: TCP
name: http
- port: 9000
targetPort: 9000
protocol: TCP
name: native
selector:
app: clickhouse
type: ClusterIP
Step 3: LogStash Data Ingestion Deployment
3.1 LogStash ConfigMap
Create logstash/logstash-config.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: logstash-config
namespace: data-ingestion
data:
logstash.conf: |
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql-service.data-sources.svc.cluster.local:3306/adventureworks?useSSL=false&allowPublicKeyRetrieval=true"
jdbc_user => "logstash"
jdbc_password => "logstash123"
statement => "SELECT * FROM Sales_SalesOrderHeader WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate"
use_column_value => true
tracking_column => "ModifiedDate"
tracking_column_type => "timestamp"
schedule => "*/30 * * * * *"
jdbc_page_size => 25000
jdbc_paging_enabled => true
jdbc_validate_connection => true
type => "sales_orders"
tags => ["sales", "orders", "kubernetes"]
}
}
filter {
uuid {
target => "correlation_id"
}
mutate {
add_field => {
"pipeline_timestamp" => "%{@timestamp}"
"source_system" => "adventureworks_mysql"
"logstash_host" => "%{host}"
"kubernetes_namespace" => "data-ingestion"
}
remove_field => ["@version"]
}
if [ModifiedDate] {
date {
match => [ "ModifiedDate", "yyyy-MM-dd HH:mm:ss" ]
target => "processed_modified_date"
add_tag => ["date_parsed"]
tag_on_failure => ["date_parse_failure"]
}
}
}
output {
if [type] == "sales_orders" and "date_parse_failure" not in [tags] {
http {
url => "http://clickhouse-service.data-warehouse.svc.cluster.local:8123/?query=INSERT%20INTO%20bronze_layer.sales_orders%20FORMAT%20JSONEachRow"
http_method => "post"
format => "json"
headers => {
"Content-Type" => "application/json"
}
}
}
# Error logging
if "date_parse_failure" in [tags] {
stdout {
codec => json
id => "error_logging"
}
}
# Operational logging
stdout {
codec => json
id => "operational_logging"
}
}
pipelines.yml: |
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline/logstash.conf"
pipeline.workers: 2
pipeline.batch.size: 1000
pipeline.batch.delay: 50
3.2 LogStash Deployment
Create logstash/logstash-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: logstash
namespace: data-ingestion
labels:
app: logstash
tier: ingestion
spec:
replicas: 2
selector:
matchLabels:
app: logstash
template:
metadata:
labels:
app: logstash
spec:
containers:
- name: logstash
image: docker.elastic.co/logstash/logstash:8.11.0
ports:
- containerPort: 5044
name: beats
- containerPort: 9600
name: monitoring
volumeMounts:
- name: logstash-config
mountPath: /usr/share/logstash/pipeline/logstash.conf
subPath: logstash.conf
- name: logstash-config
mountPath: /usr/share/logstash/config/pipelines.yml
subPath: pipelines.yml
resources:
requests:
memory: "1Gi"
cpu: "0.5"
limits:
memory: "2Gi"
cpu: "1"
env:
- name: LS_JAVA_OPTS
value: "-Xmx1g -Xms1g"
livenessProbe:
httpGet:
path: /
port: 9600
initialDelaySeconds: 60
periodSeconds: 10
readinessProbe:
httpGet:
path: /
port: 9600
initialDelaySeconds: 30
periodSeconds: 5
volumes:
- name: logstash-config
configMap:
name: logstash-config
3.3 LogStash Service
Create logstash/logstash-service.yaml
:
apiVersion: v1
kind: Service
metadata:
name: logstash-service
namespace: data-ingestion
labels:
app: logstash
spec:
ports:
- port: 5044
targetPort: 5044
protocol: TCP
name: beats
- port: 9600
targetPort: 9600
protocol: TCP
name: monitoring
selector:
app: logstash
type: ClusterIP
Step 4: Apache Airflow Orchestration Deployment
4.1 Airflow Namespace and RBAC
Create airflow/namespace-rbac.yaml
:
apiVersion: v1
kind: Namespace
metadata:
name: orchestration
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow
namespace: orchestration
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: airflow-cluster-role
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/exec"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-cluster-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: airflow-cluster-role
subjects:
- kind: ServiceAccount
name: airflow
namespace: orchestration
4.2 Airflow PostgreSQL Database
Create airflow/postgres-deployment.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: postgres-config
namespace: orchestration
data:
POSTGRES_DB: airflow
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: postgres-pvc
namespace: orchestration
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: orchestration
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:13
ports:
- containerPort: 5432
envFrom:
- configMapRef:
name: postgres-config
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
resources:
requests:
memory: "512Mi"
cpu: "0.25"
limits:
memory: "1Gi"
cpu: "0.5"
volumes:
- name: postgres-storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
name: postgres-service
namespace: orchestration
spec:
selector:
app: postgres
ports:
- port: 5432
targetPort: 5432
4.3 Airflow Redis
Create airflow/redis-deployment.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: orchestration
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
resources:
requests:
memory: "256Mi"
cpu: "0.1"
limits:
memory: "512Mi"
cpu: "0.25"
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: orchestration
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
4.4 Airflow Configuration
Create airflow/airflow-config.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-config
namespace: orchestration
data:
airflow.cfg: |
[core]
dags_folder = /opt/airflow/dags
base_log_folder = /opt/airflow/logs
logging_level = INFO
executor = KubernetesExecutor
parallelism = 32
max_active_tasks_per_dag = 16
max_active_runs_per_dag = 16
[database]
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow
[celery]
broker_url = redis://redis-service:6379/0
result_backend = db+postgresql://airflow:airflow@postgres-service:5432/airflow
[kubernetes]
namespace = orchestration
airflow_configmap = airflow-config
worker_container_repository = apache/airflow
worker_container_tag = 2.7.0
worker_service_account_name = airflow
delete_worker_pods = True
[webserver]
base_url = http://localhost:8080
web_server_port = 8080
[scheduler]
catchup_by_default = False
[email]
email_backend = airflow.utils.email.send_email_smtp
---
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
namespace: orchestration
type: Opaque
data:
# Base64 encoded values
mysql-password: bG9nc3Rhc2gxMjM= # logstash123
clickhouse-password: Y2xpY2tob3VzZTEyMw== # clickhouse123
postgres-password: YWlyZmxvdw== # airflow
4.5 Airflow Webserver
Create airflow/airflow-webserver.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-webserver
namespace: orchestration
labels:
app: airflow-webserver
spec:
replicas: 1
selector:
matchLabels:
app: airflow-webserver
template:
metadata:
labels:
app: airflow-webserver
spec:
serviceAccountName: airflow
initContainers:
- name: init-db
image: apache/airflow:2.7.0
command: ["airflow", "db", "init"]
env:
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: "postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow"
volumeMounts:
- name: airflow-config
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
containers:
- name: airflow-webserver
image: apache/airflow:2.7.0
command: ["airflow", "webserver"]
ports:
- containerPort: 8080
env:
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: "postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow"
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor"
volumeMounts:
- name: airflow-config
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /opt/airflow/dags
resources:
requests:
memory: "1Gi"
cpu: "0.5"
limits:
memory: "2Gi"
cpu: "1"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 120
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
volumes:
- name: airflow-config
configMap:
name: airflow-config
- name: airflow-dags
configMap:
name: airflow-dags
---
apiVersion: v1
kind: Service
metadata:
name: airflow-webserver-service
namespace: orchestration
spec:
selector:
app: airflow-webserver
ports:
- port: 8080
targetPort: 8080
type: LoadBalancer
4.6 Airflow Scheduler
Create airflow/airflow-scheduler.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-scheduler
namespace: orchestration
labels:
app: airflow-scheduler
spec:
replicas: 1
selector:
matchLabels:
app: airflow-scheduler
template:
metadata:
labels:
app: airflow-scheduler
spec:
serviceAccountName: airflow
containers:
- name: airflow-scheduler
image: apache/airflow:2.7.0
command: ["airflow", "scheduler"]
env:
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
value: "postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow"
- name: AIRFLOW__CORE__EXECUTOR
value: "KubernetesExecutor"
volumeMounts:
- name: airflow-config
mountPath: /opt/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /opt/airflow/dags
resources:
requests:
memory: "1Gi"
cpu: "0.5"
limits:
memory: "2Gi"
cpu: "1"
livenessProbe:
exec:
command:
- sh
- -c
- "airflow jobs check --job-type SchedulerJob --hostname $(hostname)"
initialDelaySeconds: 120
periodSeconds: 60
volumes:
- name: airflow-config
configMap:
name: airflow-config
- name: airflow-dags
configMap:
name: airflow-dags
4.7 Airflow DAGs ConfigMap
Create airflow/airflow-dags.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-dags
namespace: orchestration
data:
adventureworks_etl_k8s.py: |
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.python_operator import PythonOperator
from kubernetes.client import models as k8s
default_args = {
'owner': 'data-engineering-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'catchup': False
}
dag = DAG(
'adventureworks_etl_k8s_pipeline',
default_args=default_args,
description='AdventureWorks ETL Pipeline on Kubernetes',
schedule_interval='0 2 * * *',
max_active_runs=1,
tags=['etl', 'adventureworks', 'kubernetes']
)
# Health check task
def check_services_health():
import requests
import time
services = {
'ClickHouse': 'http://clickhouse-service.data-warehouse.svc.cluster.local:8123/ping',
'LogStash': 'http://logstash-service.data-ingestion.svc.cluster.local:9600/'
}
for service, endpoint in services.items():
try:
response = requests.get(endpoint, timeout=10)
if response.status_code not in [200, 404]: # 404 is OK for LogStash monitoring
raise Exception(f"{service} health check failed")
print(f"✅ {service} is healthy")
except Exception as e:
raise Exception(f"❌ {service} health check failed: {str(e)}")
return "All services healthy"
health_check = PythonOperator(
task_id='health_check',
python_callable=check_services_health,
dag=dag
)
# LogStash data ingestion task
logstash_ingestion = KubernetesPodOperator(
task_id='logstash_data_ingestion',
name='logstash-ingestion-pod',
namespace='data-ingestion',
image='docker.elastic.co/logstash/logstash:8.11.0',
cmds=['logstash'],
arguments=['-f', '/usr/share/logstash/pipeline/logstash.conf', '--config.reload.automatic'],
volumes=[
k8s.V1Volume(
name='logstash-config',
config_map=k8s.V1ConfigMapVolumeSource(name='logstash-config')
)
],
volume_mounts=[
k8s.V1VolumeMount(
name='logstash-config',
mount_path='/usr/share/logstash/pipeline/logstash.conf',
sub_path='logstash.conf'
)
],
env_vars={
'LS_JAVA_OPTS': '-Xmx1g -Xms1g'
},
resources=k8s.V1ResourceRequirements(
requests={'memory': '1Gi', 'cpu': '0.5'},
limits={'memory': '2Gi', 'cpu': '1'}
),
is_delete_operator_pod=True,
dag=dag
)
# DBT transformation task
dbt_transformation = KubernetesPodOperator(
task_id='dbt_transformation',
name='dbt-transformation-pod',
namespace='orchestration',
image='ghcr.io/dbt-labs/dbt-clickhouse:1.6.0',
cmds=['sh', '-c'],
arguments=[
'''
cd /dbt &&
dbt debug --target prod &&
dbt run --models staging --target prod &&
dbt test --models staging --target prod &&
dbt run --models marts --target prod &&
dbt test --models marts --target prod
'''
],
volumes=[
k8s.V1Volume(
name='dbt-project',
config_map=k8s.V1ConfigMapVolumeSource(name='dbt-project')
)
],
volume_mounts=[
k8s.V1VolumeMount(
name='dbt-project',
mount_path='/dbt'
)
],
env_vars={
'DBT_PROFILES_DIR': '/dbt/profiles'
},
resources=k8s.V1ResourceRequirements(
requests={'memory': '1Gi', 'cpu': '0.5'},
limits={'memory': '2Gi', 'cpu': '1'}
),
is_delete_operator_pod=True,
dag=dag
)
# Data quality validation
def validate_data_quality():
import requests
import json
clickhouse_url = "http://clickhouse-service.data-warehouse.svc.cluster.local:8123/"
checks = [
{
'name': 'Bronze layer record count',
'query': 'SELECT COUNT(*) FROM bronze_layer.sales_orders',
'min_expected': 100
}
]
results = {}
for check in checks:
response = requests.post(
clickhouse_url,
data=check['query'],
headers={'Content-Type': 'text/plain'}
)
if response.status_code != 200:
raise Exception(f"ClickHouse query failed: {response.text}")
count = int(response.text.strip())
if count < check['min_expected']:
raise Exception(f"Data quality check failed: {check['name']} - Expected: {check['min_expected']}, Got: {count}")
results[check['name']] = count
print(f"✅ {check['name']}: {count} records")
return results
data_quality_check = PythonOperator(
task_id='data_quality_validation',
python_callable=validate_data_quality,
dag=dag
)
# Task dependencies
health_check >> logstash_ingestion >> dbt_transformation >> data_quality_check
Step 5: Monitoring with Prometheus and Grafana
5.1 Prometheus Deployment
Create monitoring/prometheus.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: monitoring
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'clickhouse'
static_configs:
- targets: ['clickhouse-service.data-warehouse.svc.cluster.local:8123']
metrics_path: '/metrics'
- job_name: 'logstash'
static_configs:
- targets: ['logstash-service.data-ingestion.svc.cluster.local:9600']
metrics_path: '/_node/stats'
- job_name: 'airflow'
static_configs:
- targets: ['airflow-webserver-service.orchestration.svc.cluster.local:8080']
metrics_path: '/admin/metrics'
alert_rules.yml: |
groups:
- name: etl_alerts
rules:
- alert: ClickHouseDown
expr: up{job="clickhouse"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "ClickHouse is down"
description: "ClickHouse has been down for more than 2 minutes"
- alert: LogStashDown
expr: up{job="logstash"} == 0
for: 2m
labels:
severity: critical
annotations:
summary: "LogStash is down"
description: "LogStash has been down for more than 2 minutes"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: prometheus
template:
metadata:
labels:
app: prometheus
spec:
containers:
- name: prometheus
image: prom/prometheus:v2.45.0
ports:
- containerPort: 9090
volumeMounts:
- name: prometheus-config
mountPath: /etc/prometheus/
- name: prometheus-storage
mountPath: /prometheus
command:
- '/bin/prometheus'
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--storage.tsdb.retention.time=200h'
- '--web.enable-lifecycle'
resources:
requests:
memory: "1Gi"
cpu: "0.5"
limits:
memory: "2Gi"
cpu: "1"
volumes:
- name: prometheus-config
configMap:
name: prometheus-config
- name: prometheus-storage
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: prometheus-service
namespace: monitoring
spec:
selector:
app: prometheus
ports:
- port: 9090
targetPort: 9090
type: LoadBalancer
5.2 Grafana Deployment
Create monitoring/grafana.yaml
:
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-datasources
namespace: monitoring
data:
datasources.yaml: |
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus-service:9090
isDefault: true
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: grafana
namespace: monitoring
spec:
replicas: 1
selector:
matchLabels:
app: grafana
template:
metadata:
labels:
app: grafana
spec:
containers:
- name: grafana
image: grafana/grafana:10.0.0
ports:
- containerPort: 3000
env:
- name: GF_SECURITY_ADMIN_PASSWORD
value: "admin123"
volumeMounts:
- name: grafana-datasources
mountPath: /etc/grafana/provisioning/datasources
- name: grafana-storage
mountPath: /var/lib/grafana
resources:
requests:
memory: "512Mi"
cpu: "0.25"
limits:
memory: "1Gi"
cpu: "0.5"
volumes:
- name: grafana-datasources
configMap:
name: grafana-datasources
- name: grafana-storage
emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
name: grafana-service
namespace: monitoring
spec:
selector:
app: grafana
ports:
- port: 3000
targetPort: 3000
type: LoadBalancer
Step 6: Deployment Scripts and Automation
6.1 Main Deployment Script
Create deploy-k8s.sh
:
#!/bin/bash
set -e
echo "🚀 Deploying AdventureWorks ETL Pipeline to Kubernetes..."
# Create namespaces
echo "📁 Creating namespaces..."
kubectl create namespace data-sources --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace data-ingestion --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace data-warehouse --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace orchestration --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace monitoring --dry-run=client -o yaml | kubectl apply -f -
# Deploy MySQL
echo "🗄️ Deploying MySQL..."
kubectl apply -f mysql/
kubectl wait --for=condition=ready pod -l app=mysql -n data-sources --timeout=300s
# Deploy ClickHouse
echo "📊 Deploying ClickHouse..."
kubectl apply -f clickhouse/
kubectl wait --for=condition=ready pod -l app=clickhouse -n data-warehouse --timeout=300s
# Deploy LogStash
echo "⚡ Deploying LogStash..."
kubectl apply -f logstash/
kubectl wait --for=condition=ready pod -l app=logstash -n data-ingestion --timeout=300s
# Deploy Airflow
echo "🌬️ Deploying Airflow..."
kubectl apply -f airflow/
kubectl wait --for=condition=ready pod -l app=postgres -n orchestration --timeout=300s
kubectl wait --for=condition=ready pod -l app=redis -n orchestration --timeout=300s
kubectl wait --for=condition=ready pod -l app=airflow-webserver -n orchestration --timeout=300s
kubectl wait --for=condition=ready pod -l app=airflow-scheduler -n orchestration --timeout=300s
# Deploy Monitoring
echo "📈 Deploying Monitoring..."
kubectl apply -f monitoring/
kubectl wait --for=condition=ready pod -l app=prometheus -n monitoring --timeout=300s
kubectl wait --for=condition=ready pod -l app=grafana -n monitoring --timeout=300s
echo "✅ Deployment completed successfully!"
# Display service endpoints
echo ""
echo "🔗 Service Endpoints:"
echo "Airflow Web UI: http://$(kubectl get svc airflow-webserver-service -n orchestration -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):8080"
echo "Grafana Dashboard: http://$(kubectl get svc grafana-service -n monitoring -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):3000"
echo "Prometheus: http://$(kubectl get svc prometheus-service -n monitoring -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):9090"
echo ""
echo "📋 Default Credentials:"
echo "Airflow: admin/admin"
echo "Grafana: admin/admin123"
echo "ClickHouse: admin/clickhouse123"
echo ""
echo "🎯 Next Steps:"
echo "1. Access Airflow UI and enable the ETL DAG"
echo "2. Configure Grafana dashboards for monitoring"
echo "3. Set up alerts in Prometheus AlertManager"
echo "4. Configure backup strategies for persistent volumes"
6.2 Cleanup Script
Create cleanup-k8s.sh
:
#!/bin/bash
echo "🧹 Cleaning up AdventureWorks ETL Pipeline from Kubernetes..."
# Delete deployments
kubectl delete namespace data-sources --ignore-not-found=true
kubectl delete namespace data-ingestion --ignore-not-found=true
kubectl delete namespace data-warehouse --ignore-not-found=true
kubectl delete namespace orchestration --ignore-not-found=true
kubectl delete namespace monitoring --ignore-not-found=true
# Delete persistent volumes if needed
kubectl delete pv --all --ignore-not-found=true
echo "✅ Cleanup completed!"
6.3 Health Check Script
Create health-check.sh
:
#!/bin/bash
echo "🏥 Checking ETL Pipeline Health..."
NAMESPACES=("data-sources" "data-ingestion" "data-warehouse" "orchestration" "monitoring")
for ns in "${NAMESPACES[@]}"; do
echo ""
echo "📋 Namespace: $ns"
kubectl get pods -n $ns -o wide
echo "Services:"
kubectl get svc -n $ns
done
echo ""
echo "📊 Resource Usage:"
kubectl top nodes
kubectl top pods --all-namespaces --sort-by=memory
echo ""
echo "🔍 Recent Events:"
kubectl get events --all-namespaces --sort-by='.metadata.creationTimestamp' | tail -10
Step 7: Production Best Practices
7.1 Resource Management
Create resource-quotas.yaml
:
apiVersion: v1
kind: ResourceQuota
metadata:
name: etl-resource-quota
namespace: data-ingestion
spec:
hard:
requests.cpu: "4"
requests.memory: 8Gi
limits.cpu: "8"
limits.memory: 16Gi
persistentvolumeclaims: "10"
---
apiVersion: v1
kind: LimitRange
metadata:
name: etl-limit-range
namespace: data-ingestion
spec:
limits:
- default:
memory: "1Gi"
cpu: "0.5"
defaultRequest:
memory: "512Mi"
cpu: "0.25"
type: Container
7.2 Network Policies
Create network-policies.yaml
:
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: data-sources-netpol
namespace: data-sources
spec:
podSelector:
matchLabels:
app: mysql
policyTypes:
- Ingress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: data-ingestion
ports:
- protocol: TCP
port: 3306
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: data-warehouse-netpol
namespace: data-warehouse
spec:
podSelector:
matchLabels:
app: clickhouse
policyTypes:
- Ingress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: data-ingestion
- namespaceSelector:
matchLabels:
name: orchestration
ports:
- protocol: TCP
port: 8123
- protocol: TCP
port: 9000
7.3 Backup Strategy
Create backup-cronjob.yaml
:
apiVersion: batch/v1
kind: CronJob
metadata:
name: etl-backup
namespace: data-warehouse
spec:
schedule: "0 2 * * *" # Daily at 2 AM
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: clickhouse/clickhouse-server:23.8-alpine
command:
- /bin/sh
- -c
- |
clickhouse-client --host clickhouse-service --query "BACKUP DATABASE bronze_layer TO S3('s3://your-backup-bucket/clickhouse/bronze_layer', 'access_key', 'secret_key');"
clickhouse-client --host clickhouse-service --query "BACKUP DATABASE gold_layer TO S3('s3://your-backup-bucket/clickhouse/gold_layer', 'access_key', 'secret_key');"
restartPolicy: OnFailure
Step 8: Troubleshooting Guide
Common Issues and Solutions
Pod Stuck in Pending State
kubectl describe pod <pod-name> -n <namespace> # Check for resource constraints or PVC issues
Service Connection Issues
kubectl get endpoints -n <namespace> kubectl exec -it <pod-name> -n <namespace> -- nslookup <service-name>
LogStash Connection Failures
kubectl logs -f <logstash-pod> -n data-ingestion kubectl exec -it <logstash-pod> -n data-ingestion -- cat /etc/hosts
ClickHouse Performance Issues
kubectl exec -it <clickhouse-pod> -n data-warehouse -- clickhouse-client --query "SHOW PROCESSLIST"
Airflow DAG Issues
kubectl logs -f <airflow-scheduler-pod> -n orchestration kubectl exec -it <airflow-webserver-pod> -n orchestration -- airflow dags list
Step 9: Scaling and Performance Tuning
Horizontal Pod Autoscaler
Create hpa.yaml
:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: logstash-hpa
namespace: data-ingestion
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: logstash
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
Vertical Pod Autoscaler
Create vpa.yaml
:
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: clickhouse-vpa
namespace: data-warehouse
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: clickhouse
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: clickhouse
maxAllowed:
cpu: 4
memory: 8Gi
minAllowed:
cpu: 1
memory: 2Gi
Conclusion
This Kubernetes deployment provides a scalable, production-ready infrastructure for the AdventureWorks ETL pipeline with:
High Availability: Multi-replica deployments with health checks
Scalability: Horizontal and vertical autoscaling capabilities
Security: Network policies, RBAC, and secret management
Monitoring: Comprehensive observability with Prometheus and Grafana
Automation: GitOps-ready YAML manifests and deployment scripts
Reliability: Persistent storage, backup strategies, and disaster recovery
The infrastructure is designed to handle enterprise workloads while maintaining operational excellence and cost efficiency in Kubernetes environments.
Last updated
Was this helpful?