DataSuite ETL on Kubernetes - Production Infrastructure Guide

Overview

This documentation provides a comprehensive guide for deploying the complete AdventureWorks ETL pipeline on Kubernetes, including MySQL source database, LogStash data ingestion, ClickHouse data warehouse, DBT transformations, and Apache Airflow orchestration.

Kubernetes Architecture

@startuml
!theme plain

package "Kubernetes Cluster" {
  package "Data Sources Namespace" {
    [MySQL Pod] as mysql
    [MySQL Service] as mysql_svc
    [MySQL PVC] as mysql_pvc
  }

  package "Data Ingestion Namespace" {
    [LogStash Pod] as logstash
    [LogStash Service] as logstash_svc
    [LogStash ConfigMap] as logstash_cm
  }

  package "Data Warehouse Namespace" {
    [ClickHouse Pod] as clickhouse
    [ClickHouse Service] as clickhouse_svc
    [ClickHouse PVC] as clickhouse_pvc
  }

  package "Orchestration Namespace" {
    [Airflow Webserver] as airflow_web
    [Airflow Scheduler] as airflow_scheduler
    [Airflow Worker] as airflow_worker
    [PostgreSQL] as airflow_db
    [Redis] as airflow_redis
  }

  package "Monitoring Namespace" {
    [Prometheus] as prometheus
    [Grafana] as grafana
    [AlertManager] as alertmanager
  }
}

mysql --> logstash : "Data Extract"
logstash --> clickhouse : "Data Load"
clickhouse --> airflow_scheduler : "DBT Transform"
prometheus --> grafana : "Metrics"
prometheus --> alertmanager : "Alerts"

@enduml

Prerequisites

  • Kubernetes cluster (v1.25+) with at least 16GB RAM and 8 CPU cores

  • kubectl configured with cluster access

  • Helm 3.x installed

  • Persistent Volume support (StorageClass configured)

  • LoadBalancer or Ingress controller for external access

  • Container registry access (Docker Hub or private registry)

Namespace Strategy

# Create namespaces for logical separation
kubectl create namespace data-sources
kubectl create namespace data-ingestion  
kubectl create namespace data-warehouse
kubectl create namespace orchestration
kubectl create namespace monitoring

Step 1: MySQL Source Database Deployment

1.1 MySQL ConfigMap

Create mysql/mysql-configmap.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: mysql-config
  namespace: data-sources
data:
  my.cnf: |
    [mysqld]
    default-storage-engine=innodb
    skip-host-cache
    skip-name-resolve
    datadir=/var/lib/mysql
    socket=/var/run/mysqld/mysqld.sock
    secure-file-priv=/var/lib/mysql-files
    user=mysql
    
    # Performance settings
    innodb_buffer_pool_size=1G
    innodb_log_file_size=256M
    max_connections=200
    
    # Logging
    general_log=1
    general_log_file=/var/lib/mysql/general.log
    slow_query_log=1
    slow_query_log_file=/var/lib/mysql/slow.log
    long_query_time=2

1.2 MySQL Secret

Create mysql/mysql-secret.yaml:

apiVersion: v1
kind: Secret
metadata:
  name: mysql-secret
  namespace: data-sources
type: Opaque
data:
  MYSQL_ROOT_PASSWORD: cGFzc3dvcmQ=  # base64 encoded 'password'
  MYSQL_DATABASE: YWR2ZW50dXJld29ya3M=  # base64 encoded 'adventureworks'

1.3 MySQL Persistent Volume

Create mysql/mysql-pvc.yaml:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: mysql-pvc
  namespace: data-sources
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 20Gi
  storageClassName: standard

1.4 MySQL Deployment

Create mysql/mysql-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
  namespace: data-sources
  labels:
    app: mysql
    tier: database
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - name: mysql
        image: mysql:8.0
        env:
        - name: MYSQL_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: mysql-secret
              key: MYSQL_ROOT_PASSWORD
        - name: MYSQL_DATABASE
          valueFrom:
            secretKeyRef:
              name: mysql-secret
              key: MYSQL_DATABASE
        ports:
        - containerPort: 3306
          name: mysql
        volumeMounts:
        - name: mysql-storage
          mountPath: /var/lib/mysql
        - name: mysql-config
          mountPath: /etc/mysql/conf.d
        - name: mysql-initdb
          mountPath: /docker-entrypoint-initdb.d
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
          limits:
            memory: "2Gi"
            cpu: "1"
        livenessProbe:
          exec:
            command:
            - mysqladmin
            - ping
            - -h
            - localhost
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          exec:
            command:
            - mysql
            - -h
            - localhost
            - -e
            - SELECT 1
          initialDelaySeconds: 5
          periodSeconds: 2
      volumes:
      - name: mysql-storage
        persistentVolumeClaim:
          claimName: mysql-pvc
      - name: mysql-config
        configMap:
          name: mysql-config
      - name: mysql-initdb
        configMap:
          name: mysql-initdb

1.5 MySQL Service

Create mysql/mysql-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: mysql-service
  namespace: data-sources
  labels:
    app: mysql
spec:
  ports:
  - port: 3306
    targetPort: 3306
    protocol: TCP
  selector:
    app: mysql
  type: ClusterIP

1.6 MySQL Data Initialization

Create mysql/mysql-initdb-configmap.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: mysql-initdb
  namespace: data-sources
data:
  01-create-schema.sql: |
    CREATE DATABASE IF NOT EXISTS adventureworks;
    USE adventureworks;
    
    -- Grant permissions for LogStash access
    CREATE USER 'logstash'@'%' IDENTIFIED BY 'logstash123';
    GRANT SELECT ON adventureworks.* TO 'logstash'@'%';
    FLUSH PRIVILEGES;
  
  02-load-adventureworks.sql: |
    -- This file should contain the AdventureWorks sample data
    -- For production, mount the actual AdventureWorks SQL file
    USE adventureworks;
    
    -- Sample table creation (replace with actual AdventureWorks schema)
    CREATE TABLE IF NOT EXISTS Sales_SalesOrderHeader (
      SalesOrderID INT PRIMARY KEY,
      CustomerID INT,
      TerritoryID INT,
      OrderDate DATETIME,
      DueDate DATETIME,
      ShipDate DATETIME,
      Status TINYINT,
      SubTotal DECIMAL(19,4),
      TaxAmt DECIMAL(19,4),
      Freight DECIMAL(19,4),
      TotalDue DECIMAL(19,4),
      ModifiedDate DATETIME
    );

Step 2: ClickHouse Data Warehouse Deployment

2.1 ClickHouse ConfigMap

Create clickhouse/clickhouse-config.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: clickhouse-config
  namespace: data-warehouse
data:
  config.xml: |
    <?xml version="1.0"?>
    <yandex>
        <logger>
            <level>information</level>
            <log>/var/log/clickhouse-server/clickhouse-server.log</log>
            <errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
            <size>1000M</size>
            <count>10</count>
        </logger>
        
        <http_port>8123</http_port>
        <tcp_port>9000</tcp_port>
        <mysql_port>9004</mysql_port>
        <postgresql_port>9005</postgresql_port>
        
        <listen_host>0.0.0.0</listen_host>
        
        <max_connections>200</max_connections>
        <keep_alive_timeout>3</keep_alive_timeout>
        <max_concurrent_queries>100</max_concurrent_queries>
        
        <path>/var/lib/clickhouse/</path>
        <tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
        <user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
        
        <users_config>users.xml</users_config>
        
        <default_profile>default</default_profile>
        <default_database>default</default_database>
        
        <timezone>UTC</timezone>
        
        <mlock_executable>false</mlock_executable>
        
        <remote_servers>
            <cluster_1>
                <shard>
                    <replica>
                        <host>localhost</host>
                        <port>9000</port>
                    </replica>
                </shard>
            </cluster_1>
        </remote_servers>
    </yandex>
    
  users.xml: |
    <?xml version="1.0"?>
    <yandex>
        <profiles>
            <default>
                <max_memory_usage>10000000000</max_memory_usage>
                <use_uncompressed_cache>0</use_uncompressed_cache>
                <load_balancing>random</load_balancing>
            </default>
        </profiles>
        
        <users>
            <default>
                <password></password>
                <networks incl="networks" replace="replace">
                    <ip>::/0</ip>
                </networks>
                <profile>default</profile>
                <quota>default</quota>
            </default>
            
            <admin>
                <password>clickhouse123</password>
                <networks>
                    <ip>::/0</ip>
                </networks>
                <profile>default</profile>
                <quota>default</quota>
                <access_management>1</access_management>
            </admin>
        </users>
        
        <quotas>
            <default>
                <interval>
                    <duration>3600</duration>
                    <queries>0</queries>
                    <errors>0</errors>
                    <result_rows>0</result_rows>
                    <read_rows>0</read_rows>
                    <execution_time>0</execution_time>
                </interval>
            </default>
        </quotas>
    </yandex>
    
  init-db.sql: |
    -- Create databases and schemas
    CREATE DATABASE IF NOT EXISTS bronze_layer;
    CREATE DATABASE IF NOT EXISTS gold_layer;
    
    -- Bronze layer tables
    CREATE TABLE IF NOT EXISTS bronze_layer.sales_orders (
      salesorderid Int32,
      customerid Int32,
      territoryid Int32,
      orderdate DateTime,
      duedate DateTime,
      shipdate DateTime,
      status UInt8,
      subtotal Decimal(19,4),
      taxamt Decimal(19,4),
      freight Decimal(19,4),
      totaldue Decimal(19,4),
      modifieddate DateTime,
      pipeline_timestamp DateTime DEFAULT now(),
      source_system String DEFAULT 'adventureworks_mysql'
    ) ENGINE = MergeTree()
    ORDER BY (salesorderid, orderdate)
    PARTITION BY toYYYYMM(orderdate);

2.2 ClickHouse Deployment

Create clickhouse/clickhouse-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: clickhouse
  namespace: data-warehouse
  labels:
    app: clickhouse
    tier: analytics
spec:
  replicas: 1
  selector:
    matchLabels:
      app: clickhouse
  template:
    metadata:
      labels:
        app: clickhouse
    spec:
      containers:
      - name: clickhouse
        image: clickhouse/clickhouse-server:23.8-alpine
        ports:
        - containerPort: 8123
          name: http
        - containerPort: 9000
          name: native
        - containerPort: 9004
          name: mysql
        - containerPort: 9005
          name: postgres
        volumeMounts:
        - name: clickhouse-storage
          mountPath: /var/lib/clickhouse
        - name: clickhouse-config
          mountPath: /etc/clickhouse-server/config.xml
          subPath: config.xml
        - name: clickhouse-config
          mountPath: /etc/clickhouse-server/users.xml
          subPath: users.xml
        - name: clickhouse-config
          mountPath: /docker-entrypoint-initdb.d/init-db.sql
          subPath: init-db.sql
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"
        livenessProbe:
          httpGet:
            path: /ping
            port: 8123
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ping
            port: 8123
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: clickhouse-storage
        persistentVolumeClaim:
          claimName: clickhouse-pvc
      - name: clickhouse-config
        configMap:
          name: clickhouse-config

2.3 ClickHouse PVC and Service

Create clickhouse/clickhouse-pvc.yaml:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: clickhouse-pvc
  namespace: data-warehouse
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 50Gi
  storageClassName: standard

Create clickhouse/clickhouse-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: clickhouse-service
  namespace: data-warehouse
  labels:
    app: clickhouse
spec:
  ports:
  - port: 8123
    targetPort: 8123
    protocol: TCP
    name: http
  - port: 9000
    targetPort: 9000
    protocol: TCP
    name: native
  selector:
    app: clickhouse
  type: ClusterIP

Step 3: LogStash Data Ingestion Deployment

3.1 LogStash ConfigMap

Create logstash/logstash-config.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-config
  namespace: data-ingestion
data:
  logstash.conf: |
    input {
      jdbc {
        jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://mysql-service.data-sources.svc.cluster.local:3306/adventureworks?useSSL=false&allowPublicKeyRetrieval=true"
        jdbc_user => "logstash"
        jdbc_password => "logstash123"
        statement => "SELECT * FROM Sales_SalesOrderHeader WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate"
        use_column_value => true
        tracking_column => "ModifiedDate"
        tracking_column_type => "timestamp"
        schedule => "*/30 * * * * *"
        jdbc_page_size => 25000
        jdbc_paging_enabled => true
        jdbc_validate_connection => true
        type => "sales_orders"
        tags => ["sales", "orders", "kubernetes"]
      }
    }

    filter {
      uuid {
        target => "correlation_id"
      }

      mutate {
        add_field => { 
          "pipeline_timestamp" => "%{@timestamp}"
          "source_system" => "adventureworks_mysql"
          "logstash_host" => "%{host}"
          "kubernetes_namespace" => "data-ingestion"
        }
        remove_field => ["@version"]
      }

      if [ModifiedDate] {
        date {
          match => [ "ModifiedDate", "yyyy-MM-dd HH:mm:ss" ]
          target => "processed_modified_date"
          add_tag => ["date_parsed"]
          tag_on_failure => ["date_parse_failure"]
        }
      }
    }

    output {
      if [type] == "sales_orders" and "date_parse_failure" not in [tags] {
        http {
          url => "http://clickhouse-service.data-warehouse.svc.cluster.local:8123/?query=INSERT%20INTO%20bronze_layer.sales_orders%20FORMAT%20JSONEachRow"
          http_method => "post"
          format => "json"
          headers => {
            "Content-Type" => "application/json"
          }
        }
      }

      # Error logging
      if "date_parse_failure" in [tags] {
        stdout {
          codec => json
          id => "error_logging"
        }
      }

      # Operational logging
      stdout {
        codec => json
        id => "operational_logging"
      }
    }
    
  pipelines.yml: |
    - pipeline.id: main
      path.config: "/usr/share/logstash/pipeline/logstash.conf"
      pipeline.workers: 2
      pipeline.batch.size: 1000
      pipeline.batch.delay: 50

3.2 LogStash Deployment

Create logstash/logstash-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash
  namespace: data-ingestion
  labels:
    app: logstash
    tier: ingestion
spec:
  replicas: 2
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      containers:
      - name: logstash
        image: docker.elastic.co/logstash/logstash:8.11.0
        ports:
        - containerPort: 5044
          name: beats
        - containerPort: 9600
          name: monitoring
        volumeMounts:
        - name: logstash-config
          mountPath: /usr/share/logstash/pipeline/logstash.conf
          subPath: logstash.conf
        - name: logstash-config
          mountPath: /usr/share/logstash/config/pipelines.yml
          subPath: pipelines.yml
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
          limits:
            memory: "2Gi"
            cpu: "1"
        env:
        - name: LS_JAVA_OPTS
          value: "-Xmx1g -Xms1g"
        livenessProbe:
          httpGet:
            path: /
            port: 9600
          initialDelaySeconds: 60
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /
            port: 9600
          initialDelaySeconds: 30
          periodSeconds: 5
      volumes:
      - name: logstash-config
        configMap:
          name: logstash-config

3.3 LogStash Service

Create logstash/logstash-service.yaml:

apiVersion: v1
kind: Service
metadata:
  name: logstash-service
  namespace: data-ingestion
  labels:
    app: logstash
spec:
  ports:
  - port: 5044
    targetPort: 5044
    protocol: TCP
    name: beats
  - port: 9600
    targetPort: 9600
    protocol: TCP
    name: monitoring
  selector:
    app: logstash
  type: ClusterIP

Step 4: Apache Airflow Orchestration Deployment

4.1 Airflow Namespace and RBAC

Create airflow/namespace-rbac.yaml:

apiVersion: v1
kind: Namespace
metadata:
  name: orchestration
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: airflow
  namespace: orchestration
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: airflow-cluster-role
rules:
- apiGroups: [""]
  resources: ["pods", "pods/log", "pods/exec"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
- apiGroups: ["apps"]
  resources: ["deployments"]
  verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: airflow-cluster-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: airflow-cluster-role
subjects:
- kind: ServiceAccount
  name: airflow
  namespace: orchestration

4.2 Airflow PostgreSQL Database

Create airflow/postgres-deployment.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-config
  namespace: orchestration
data:
  POSTGRES_DB: airflow
  POSTGRES_USER: airflow
  POSTGRES_PASSWORD: airflow
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: postgres-pvc
  namespace: orchestration
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres
  namespace: orchestration
spec:
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
      - name: postgres
        image: postgres:13
        ports:
        - containerPort: 5432
        envFrom:
        - configMapRef:
            name: postgres-config
        volumeMounts:
        - name: postgres-storage
          mountPath: /var/lib/postgresql/data
        resources:
          requests:
            memory: "512Mi"
            cpu: "0.25"
          limits:
            memory: "1Gi"
            cpu: "0.5"
      volumes:
      - name: postgres-storage
        persistentVolumeClaim:
          claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
  name: postgres-service
  namespace: orchestration
spec:
  selector:
    app: postgres
  ports:
  - port: 5432
    targetPort: 5432

4.3 Airflow Redis

Create airflow/redis-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis
  namespace: orchestration
spec:
  replicas: 1
  selector:
    matchLabels:
      app: redis
  template:
    metadata:
      labels:
        app: redis
    spec:
      containers:
      - name: redis
        image: redis:7-alpine
        ports:
        - containerPort: 6379
        resources:
          requests:
            memory: "256Mi"
            cpu: "0.1"
          limits:
            memory: "512Mi"
            cpu: "0.25"
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
  namespace: orchestration
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379

4.4 Airflow Configuration

Create airflow/airflow-config.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: orchestration
data:
  airflow.cfg: |
    [core]
    dags_folder = /opt/airflow/dags
    base_log_folder = /opt/airflow/logs
    logging_level = INFO
    executor = KubernetesExecutor
    parallelism = 32
    max_active_tasks_per_dag = 16
    max_active_runs_per_dag = 16
    
    [database]
    sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow
    
    [celery]
    broker_url = redis://redis-service:6379/0
    result_backend = db+postgresql://airflow:airflow@postgres-service:5432/airflow
    
    [kubernetes]
    namespace = orchestration
    airflow_configmap = airflow-config
    worker_container_repository = apache/airflow
    worker_container_tag = 2.7.0
    worker_service_account_name = airflow
    delete_worker_pods = True
    
    [webserver]
    base_url = http://localhost:8080
    web_server_port = 8080
    
    [scheduler]
    catchup_by_default = False
    
    [email]
    email_backend = airflow.utils.email.send_email_smtp
---
apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
  namespace: orchestration
type: Opaque
data:
  # Base64 encoded values
  mysql-password: bG9nc3Rhc2gxMjM=  # logstash123
  clickhouse-password: Y2xpY2tob3VzZTEyMw==  # clickhouse123
  postgres-password: YWlyZmxvdw==  # airflow

4.5 Airflow Webserver

Create airflow/airflow-webserver.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow-webserver
  namespace: orchestration
  labels:
    app: airflow-webserver
spec:
  replicas: 1
  selector:
    matchLabels:
      app: airflow-webserver
  template:
    metadata:
      labels:
        app: airflow-webserver
    spec:
      serviceAccountName: airflow
      initContainers:
      - name: init-db
        image: apache/airflow:2.7.0
        command: ["airflow", "db", "init"]
        env:
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          value: "postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow"
        volumeMounts:
        - name: airflow-config
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
      containers:
      - name: airflow-webserver
        image: apache/airflow:2.7.0
        command: ["airflow", "webserver"]
        ports:
        - containerPort: 8080
        env:
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          value: "postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow"
        - name: AIRFLOW__CORE__EXECUTOR
          value: "KubernetesExecutor"
        volumeMounts:
        - name: airflow-config
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /opt/airflow/dags
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
          limits:
            memory: "2Gi"
            cpu: "1"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 120
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
      volumes:
      - name: airflow-config
        configMap:
          name: airflow-config
      - name: airflow-dags
        configMap:
          name: airflow-dags
---
apiVersion: v1
kind: Service
metadata:
  name: airflow-webserver-service
  namespace: orchestration
spec:
  selector:
    app: airflow-webserver
  ports:
  - port: 8080
    targetPort: 8080
  type: LoadBalancer

4.6 Airflow Scheduler

Create airflow/airflow-scheduler.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow-scheduler
  namespace: orchestration
  labels:
    app: airflow-scheduler
spec:
  replicas: 1
  selector:
    matchLabels:
      app: airflow-scheduler
  template:
    metadata:
      labels:
        app: airflow-scheduler
    spec:
      serviceAccountName: airflow
      containers:
      - name: airflow-scheduler
        image: apache/airflow:2.7.0
        command: ["airflow", "scheduler"]
        env:
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          value: "postgresql+psycopg2://airflow:airflow@postgres-service:5432/airflow"
        - name: AIRFLOW__CORE__EXECUTOR
          value: "KubernetesExecutor"
        volumeMounts:
        - name: airflow-config
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /opt/airflow/dags
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
          limits:
            memory: "2Gi"
            cpu: "1"
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "airflow jobs check --job-type SchedulerJob --hostname $(hostname)"
          initialDelaySeconds: 120
          periodSeconds: 60
      volumes:
      - name: airflow-config
        configMap:
          name: airflow-config
      - name: airflow-dags
        configMap:
          name: airflow-dags

4.7 Airflow DAGs ConfigMap

Create airflow/airflow-dags.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-dags
  namespace: orchestration
data:
  adventureworks_etl_k8s.py: |
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.providers.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
    from airflow.operators.python_operator import PythonOperator
    from kubernetes.client import models as k8s
    
    default_args = {
        'owner': 'data-engineering-team',
        'depends_on_past': False,
        'start_date': datetime(2025, 1, 1),
        'email_on_failure': True,
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'catchup': False
    }
    
    dag = DAG(
        'adventureworks_etl_k8s_pipeline',
        default_args=default_args,
        description='AdventureWorks ETL Pipeline on Kubernetes',
        schedule_interval='0 2 * * *',
        max_active_runs=1,
        tags=['etl', 'adventureworks', 'kubernetes']
    )
    
    # Health check task
    def check_services_health():
        import requests
        import time
        
        services = {
            'ClickHouse': 'http://clickhouse-service.data-warehouse.svc.cluster.local:8123/ping',
            'LogStash': 'http://logstash-service.data-ingestion.svc.cluster.local:9600/'
        }
        
        for service, endpoint in services.items():
            try:
                response = requests.get(endpoint, timeout=10)
                if response.status_code not in [200, 404]:  # 404 is OK for LogStash monitoring
                    raise Exception(f"{service} health check failed")
                print(f"✅ {service} is healthy")
            except Exception as e:
                raise Exception(f"❌ {service} health check failed: {str(e)}")
        
        return "All services healthy"
    
    health_check = PythonOperator(
        task_id='health_check',
        python_callable=check_services_health,
        dag=dag
    )
    
    # LogStash data ingestion task
    logstash_ingestion = KubernetesPodOperator(
        task_id='logstash_data_ingestion',
        name='logstash-ingestion-pod',
        namespace='data-ingestion',
        image='docker.elastic.co/logstash/logstash:8.11.0',
        cmds=['logstash'],
        arguments=['-f', '/usr/share/logstash/pipeline/logstash.conf', '--config.reload.automatic'],
        volumes=[
            k8s.V1Volume(
                name='logstash-config',
                config_map=k8s.V1ConfigMapVolumeSource(name='logstash-config')
            )
        ],
        volume_mounts=[
            k8s.V1VolumeMount(
                name='logstash-config',
                mount_path='/usr/share/logstash/pipeline/logstash.conf',
                sub_path='logstash.conf'
            )
        ],
        env_vars={
            'LS_JAVA_OPTS': '-Xmx1g -Xms1g'
        },
        resources=k8s.V1ResourceRequirements(
            requests={'memory': '1Gi', 'cpu': '0.5'},
            limits={'memory': '2Gi', 'cpu': '1'}
        ),
        is_delete_operator_pod=True,
        dag=dag
    )
    
    # DBT transformation task
    dbt_transformation = KubernetesPodOperator(
        task_id='dbt_transformation',
        name='dbt-transformation-pod',
        namespace='orchestration',
        image='ghcr.io/dbt-labs/dbt-clickhouse:1.6.0',
        cmds=['sh', '-c'],
        arguments=[
            '''
            cd /dbt &&
            dbt debug --target prod &&
            dbt run --models staging --target prod &&
            dbt test --models staging --target prod &&
            dbt run --models marts --target prod &&
            dbt test --models marts --target prod
            '''
        ],
        volumes=[
            k8s.V1Volume(
                name='dbt-project',
                config_map=k8s.V1ConfigMapVolumeSource(name='dbt-project')
            )
        ],
        volume_mounts=[
            k8s.V1VolumeMount(
                name='dbt-project',
                mount_path='/dbt'
            )
        ],
        env_vars={
            'DBT_PROFILES_DIR': '/dbt/profiles'
        },
        resources=k8s.V1ResourceRequirements(
            requests={'memory': '1Gi', 'cpu': '0.5'},
            limits={'memory': '2Gi', 'cpu': '1'}
        ),
        is_delete_operator_pod=True,
        dag=dag
    )
    
    # Data quality validation
    def validate_data_quality():
        import requests
        import json
        
        clickhouse_url = "http://clickhouse-service.data-warehouse.svc.cluster.local:8123/"
        
        checks = [
            {
                'name': 'Bronze layer record count',
                'query': 'SELECT COUNT(*) FROM bronze_layer.sales_orders',
                'min_expected': 100
            }
        ]
        
        results = {}
        for check in checks:
            response = requests.post(
                clickhouse_url,
                data=check['query'],
                headers={'Content-Type': 'text/plain'}
            )
            
            if response.status_code != 200:
                raise Exception(f"ClickHouse query failed: {response.text}")
            
            count = int(response.text.strip())
            
            if count < check['min_expected']:
                raise Exception(f"Data quality check failed: {check['name']} - Expected: {check['min_expected']}, Got: {count}")
            
            results[check['name']] = count
            print(f"✅ {check['name']}: {count} records")
        
        return results
    
    data_quality_check = PythonOperator(
        task_id='data_quality_validation',
        python_callable=validate_data_quality,
        dag=dag
    )
    
    # Task dependencies
    health_check >> logstash_ingestion >> dbt_transformation >> data_quality_check

Step 5: Monitoring with Prometheus and Grafana

5.1 Prometheus Deployment

Create monitoring/prometheus.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  namespace: monitoring
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    
    rule_files:
      - "alert_rules.yml"
    
    alerting:
      alertmanagers:
        - static_configs:
            - targets:
              - alertmanager:9093
    
    scrape_configs:
      - job_name: 'prometheus'
        static_configs:
          - targets: ['localhost:9090']
      
      - job_name: 'clickhouse'
        static_configs:
          - targets: ['clickhouse-service.data-warehouse.svc.cluster.local:8123']
        metrics_path: '/metrics'
      
      - job_name: 'logstash'
        static_configs:
          - targets: ['logstash-service.data-ingestion.svc.cluster.local:9600']
        metrics_path: '/_node/stats'
      
      - job_name: 'airflow'
        static_configs:
          - targets: ['airflow-webserver-service.orchestration.svc.cluster.local:8080']
        metrics_path: '/admin/metrics'
  
  alert_rules.yml: |
    groups:
    - name: etl_alerts
      rules:
      - alert: ClickHouseDown
        expr: up{job="clickhouse"} == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "ClickHouse is down"
          description: "ClickHouse has been down for more than 2 minutes"
      
      - alert: LogStashDown
        expr: up{job="logstash"} == 0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "LogStash is down"
          description: "LogStash has been down for more than 2 minutes"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prometheus
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: prometheus
  template:
    metadata:
      labels:
        app: prometheus
    spec:
      containers:
      - name: prometheus
        image: prom/prometheus:v2.45.0
        ports:
        - containerPort: 9090
        volumeMounts:
        - name: prometheus-config
          mountPath: /etc/prometheus/
        - name: prometheus-storage
          mountPath: /prometheus
        command:
        - '/bin/prometheus'
        - '--config.file=/etc/prometheus/prometheus.yml'
        - '--storage.tsdb.path=/prometheus'
        - '--web.console.libraries=/etc/prometheus/console_libraries'
        - '--web.console.templates=/etc/prometheus/consoles'
        - '--storage.tsdb.retention.time=200h'
        - '--web.enable-lifecycle'
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
          limits:
            memory: "2Gi"
            cpu: "1"
      volumes:
      - name: prometheus-config
        configMap:
          name: prometheus-config
      - name: prometheus-storage
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: prometheus-service
  namespace: monitoring
spec:
  selector:
    app: prometheus
  ports:
  - port: 9090
    targetPort: 9090
  type: LoadBalancer

5.2 Grafana Deployment

Create monitoring/grafana.yaml:

apiVersion: v1
kind: ConfigMap
metadata:
  name: grafana-datasources
  namespace: monitoring
data:
  datasources.yaml: |
    apiVersion: 1
    datasources:
    - name: Prometheus
      type: prometheus
      access: proxy
      url: http://prometheus-service:9090
      isDefault: true
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: grafana
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: grafana
  template:
    metadata:
      labels:
        app: grafana
    spec:
      containers:
      - name: grafana
        image: grafana/grafana:10.0.0
        ports:
        - containerPort: 3000
        env:
        - name: GF_SECURITY_ADMIN_PASSWORD
          value: "admin123"
        volumeMounts:
        - name: grafana-datasources
          mountPath: /etc/grafana/provisioning/datasources
        - name: grafana-storage
          mountPath: /var/lib/grafana
        resources:
          requests:
            memory: "512Mi"
            cpu: "0.25"
          limits:
            memory: "1Gi"
            cpu: "0.5"
      volumes:
      - name: grafana-datasources
        configMap:
          name: grafana-datasources
      - name: grafana-storage
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: grafana-service
  namespace: monitoring
spec:
  selector:
    app: grafana
  ports:
  - port: 3000
    targetPort: 3000
  type: LoadBalancer

Step 6: Deployment Scripts and Automation

6.1 Main Deployment Script

Create deploy-k8s.sh:

#!/bin/bash

set -e

echo "🚀 Deploying AdventureWorks ETL Pipeline to Kubernetes..."

# Create namespaces
echo "📁 Creating namespaces..."
kubectl create namespace data-sources --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace data-ingestion --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace data-warehouse --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace orchestration --dry-run=client -o yaml | kubectl apply -f -
kubectl create namespace monitoring --dry-run=client -o yaml | kubectl apply -f -

# Deploy MySQL
echo "🗄️ Deploying MySQL..."
kubectl apply -f mysql/
kubectl wait --for=condition=ready pod -l app=mysql -n data-sources --timeout=300s

# Deploy ClickHouse
echo "📊 Deploying ClickHouse..."
kubectl apply -f clickhouse/
kubectl wait --for=condition=ready pod -l app=clickhouse -n data-warehouse --timeout=300s

# Deploy LogStash
echo "⚡ Deploying LogStash..."
kubectl apply -f logstash/
kubectl wait --for=condition=ready pod -l app=logstash -n data-ingestion --timeout=300s

# Deploy Airflow
echo "🌬️ Deploying Airflow..."
kubectl apply -f airflow/
kubectl wait --for=condition=ready pod -l app=postgres -n orchestration --timeout=300s
kubectl wait --for=condition=ready pod -l app=redis -n orchestration --timeout=300s
kubectl wait --for=condition=ready pod -l app=airflow-webserver -n orchestration --timeout=300s
kubectl wait --for=condition=ready pod -l app=airflow-scheduler -n orchestration --timeout=300s

# Deploy Monitoring
echo "📈 Deploying Monitoring..."
kubectl apply -f monitoring/
kubectl wait --for=condition=ready pod -l app=prometheus -n monitoring --timeout=300s
kubectl wait --for=condition=ready pod -l app=grafana -n monitoring --timeout=300s

echo "✅ Deployment completed successfully!"

# Display service endpoints
echo ""
echo "🔗 Service Endpoints:"
echo "Airflow Web UI: http://$(kubectl get svc airflow-webserver-service -n orchestration -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):8080"
echo "Grafana Dashboard: http://$(kubectl get svc grafana-service -n monitoring -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):3000"
echo "Prometheus: http://$(kubectl get svc prometheus-service -n monitoring -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):9090"

echo ""
echo "📋 Default Credentials:"
echo "Airflow: admin/admin"
echo "Grafana: admin/admin123"
echo "ClickHouse: admin/clickhouse123"

echo ""
echo "🎯 Next Steps:"
echo "1. Access Airflow UI and enable the ETL DAG"
echo "2. Configure Grafana dashboards for monitoring"
echo "3. Set up alerts in Prometheus AlertManager"
echo "4. Configure backup strategies for persistent volumes"

6.2 Cleanup Script

Create cleanup-k8s.sh:

#!/bin/bash

echo "🧹 Cleaning up AdventureWorks ETL Pipeline from Kubernetes..."

# Delete deployments
kubectl delete namespace data-sources --ignore-not-found=true
kubectl delete namespace data-ingestion --ignore-not-found=true
kubectl delete namespace data-warehouse --ignore-not-found=true
kubectl delete namespace orchestration --ignore-not-found=true
kubectl delete namespace monitoring --ignore-not-found=true

# Delete persistent volumes if needed
kubectl delete pv --all --ignore-not-found=true

echo "✅ Cleanup completed!"

6.3 Health Check Script

Create health-check.sh:

#!/bin/bash

echo "🏥 Checking ETL Pipeline Health..."

NAMESPACES=("data-sources" "data-ingestion" "data-warehouse" "orchestration" "monitoring")

for ns in "${NAMESPACES[@]}"; do
    echo ""
    echo "📋 Namespace: $ns"
    kubectl get pods -n $ns -o wide
    
    echo "Services:"
    kubectl get svc -n $ns
done

echo ""
echo "📊 Resource Usage:"
kubectl top nodes
kubectl top pods --all-namespaces --sort-by=memory

echo ""
echo "🔍 Recent Events:"
kubectl get events --all-namespaces --sort-by='.metadata.creationTimestamp' | tail -10

Step 7: Production Best Practices

7.1 Resource Management

Create resource-quotas.yaml:

apiVersion: v1
kind: ResourceQuota
metadata:
  name: etl-resource-quota
  namespace: data-ingestion
spec:
  hard:
    requests.cpu: "4"
    requests.memory: 8Gi
    limits.cpu: "8"
    limits.memory: 16Gi
    persistentvolumeclaims: "10"
---
apiVersion: v1
kind: LimitRange
metadata:
  name: etl-limit-range
  namespace: data-ingestion
spec:
  limits:
  - default:
      memory: "1Gi"
      cpu: "0.5"
    defaultRequest:
      memory: "512Mi"
      cpu: "0.25"
    type: Container

7.2 Network Policies

Create network-policies.yaml:

apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: data-sources-netpol
  namespace: data-sources
spec:
  podSelector:
    matchLabels:
      app: mysql
  policyTypes:
  - Ingress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: data-ingestion
    ports:
    - protocol: TCP
      port: 3306
---
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: data-warehouse-netpol
  namespace: data-warehouse
spec:
  podSelector:
    matchLabels:
      app: clickhouse
  policyTypes:
  - Ingress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: data-ingestion
    - namespaceSelector:
        matchLabels:
          name: orchestration
    ports:
    - protocol: TCP
      port: 8123
    - protocol: TCP
      port: 9000

7.3 Backup Strategy

Create backup-cronjob.yaml:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: etl-backup
  namespace: data-warehouse
spec:
  schedule: "0 2 * * *"  # Daily at 2 AM
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: backup
            image: clickhouse/clickhouse-server:23.8-alpine
            command:
            - /bin/sh
            - -c
            - |
              clickhouse-client --host clickhouse-service --query "BACKUP DATABASE bronze_layer TO S3('s3://your-backup-bucket/clickhouse/bronze_layer', 'access_key', 'secret_key');"
              clickhouse-client --host clickhouse-service --query "BACKUP DATABASE gold_layer TO S3('s3://your-backup-bucket/clickhouse/gold_layer', 'access_key', 'secret_key');"
          restartPolicy: OnFailure

Step 8: Troubleshooting Guide

Common Issues and Solutions

  1. Pod Stuck in Pending State

    kubectl describe pod <pod-name> -n <namespace>
    # Check for resource constraints or PVC issues
  2. Service Connection Issues

    kubectl get endpoints -n <namespace>
    kubectl exec -it <pod-name> -n <namespace> -- nslookup <service-name>
  3. LogStash Connection Failures

    kubectl logs -f <logstash-pod> -n data-ingestion
    kubectl exec -it <logstash-pod> -n data-ingestion -- cat /etc/hosts
  4. ClickHouse Performance Issues

    kubectl exec -it <clickhouse-pod> -n data-warehouse -- clickhouse-client --query "SHOW PROCESSLIST"
  5. Airflow DAG Issues

    kubectl logs -f <airflow-scheduler-pod> -n orchestration
    kubectl exec -it <airflow-webserver-pod> -n orchestration -- airflow dags list

Step 9: Scaling and Performance Tuning

Horizontal Pod Autoscaler

Create hpa.yaml:

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: logstash-hpa
  namespace: data-ingestion
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: logstash
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Vertical Pod Autoscaler

Create vpa.yaml:

apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
  name: clickhouse-vpa
  namespace: data-warehouse
spec:
  targetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: clickhouse
  updatePolicy:
    updateMode: "Auto"
  resourcePolicy:
    containerPolicies:
    - containerName: clickhouse
      maxAllowed:
        cpu: 4
        memory: 8Gi
      minAllowed:
        cpu: 1
        memory: 2Gi

Conclusion

This Kubernetes deployment provides a scalable, production-ready infrastructure for the AdventureWorks ETL pipeline with:

  • High Availability: Multi-replica deployments with health checks

  • Scalability: Horizontal and vertical autoscaling capabilities

  • Security: Network policies, RBAC, and secret management

  • Monitoring: Comprehensive observability with Prometheus and Grafana

  • Automation: GitOps-ready YAML manifests and deployment scripts

  • Reliability: Persistent storage, backup strategies, and disaster recovery

The infrastructure is designed to handle enterprise workloads while maintaining operational excellence and cost efficiency in Kubernetes environments.

Last updated

Was this helpful?