LogStash Configuration

LogStash serves as the data ingestion engine in the DataSuite ETL pipeline, extracting data from MySQL and loading it into ClickHouse. This guide covers configuration, customization, and optimization of LogStash pipelines.

Understanding LogStash Architecture

Pipeline Components

Input → Filter → Output
  ↓       ↓       ↓
MySQL   Transform ClickHouse

Input Plugins: Extract data from various sources (JDBC, files, APIs) Filter Plugins: Transform, enrich, and validate data Output Plugins: Load data to destinations (HTTP, databases, files)

Configuration Structure

LogStash configurations use a declarative syntax:

input {
  # Data source configuration
}

filter {
  # Data transformation logic
}

output {
  # Destination configuration
}

Basic JDBC Input Configuration

MySQL Connection Setup

input {
  jdbc {
    # JDBC driver configuration
    jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
    jdbc_user => "root"
    jdbc_password => "password"
    
    # Query configuration
    statement => "SELECT * FROM Sales.SalesOrderHeader WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate"
    
    # Incremental loading setup
    use_column_value => true
    tracking_column => "ModifiedDate"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
    clean_run => false
    
    # Scheduling
    schedule => "*/30 * * * * *"  # Every 30 seconds
    
    # Performance tuning
    jdbc_fetch_size => 1000
    jdbc_page_size => 10000
    statement_filepath => "/usr/share/logstash/sql/sales_orders.sql"
  }
}

Key Configuration Parameters

Parameter
Purpose
Example

jdbc_connection_string

Database URL

jdbc:mysql://mysql:3306/adventureworks

statement

SQL query to execute

SELECT * FROM table WHERE id > :sql_last_value

tracking_column

Column for incremental sync

ModifiedDate, id

schedule

Execution frequency

*/30 * * * * * (cron format)

jdbc_fetch_size

Rows per database fetch

1000

last_run_metadata_path

State persistence file

/path/to/.logstash_jdbc_last_run

Advanced Input Configurations

Multiple Table Ingestion

# Configuration for multiple pipelines
# File: pipelines.yml
- pipeline.id: sales-orders
  path.config: "/usr/share/logstash/pipeline/sales-orders.conf"
  
- pipeline.id: customers
  path.config: "/usr/share/logstash/pipeline/customers.conf"
  
- pipeline.id: products
  path.config: "/usr/share/logstash/pipeline/products.conf"

Sales Orders Pipeline (sales-orders.conf):

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
    jdbc_user => "root"
    jdbc_password => "password"
    statement_filepath => "/usr/share/logstash/sql/sales_orders.sql"
    use_column_value => true
    tracking_column => "ModifiedDate"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/.sales_orders_last_run"
    schedule => "*/30 * * * * *"
    type => "sales_order"
  }
}

filter {
  if [type] == "sales_order" {
    mutate {
      add_field => { "[@metadata][target_table]" => "sales_orders" }
      add_field => { "[@metadata][target_database]" => "bronze_layer" }
    }
  }
}

output {
  if [type] == "sales_order" {
    http {
      url => "http://clickhouse:8123/"
      http_method => "post" 
      format => "message"
      message => "INSERT INTO %{[@metadata][target_database]}.%{[@metadata][target_table]} FORMAT JSONEachRow %{message}"
      headers => {
        "Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw=="
        "Content-Type" => "application/json"
      }
    }
  }
}

Complex SQL Queries

File: sql/sales_orders.sql

SELECT 
    soh.SalesOrderID,
    soh.CustomerID,
    soh.TerritoryID,
    soh.OrderDate,
    soh.DueDate,
    soh.ShipDate,
    soh.Status,
    soh.SubTotal,
    soh.TaxAmt,
    soh.Freight,
    soh.TotalDue,
    soh.ModifiedDate,
    
    -- Customer information (join)
    c.PersonID as CustomerPersonID,
    c.StoreID as CustomerStoreID,
    
    -- Territory information (join)  
    st.Name as TerritoryName,
    st.CountryRegionCode,
    st.Group as TerritoryGroup
    
FROM Sales.SalesOrderHeader soh
LEFT JOIN Sales.Customer c ON soh.CustomerID = c.CustomerID
LEFT JOIN Sales.SalesTerritory st ON soh.TerritoryID = st.TerritoryID
WHERE soh.ModifiedDate > :sql_last_value
ORDER BY soh.ModifiedDate ASC
LIMIT 1000

Filter Transformations

Data Type Conversions

filter {
  # Convert string dates to proper timestamp format
  date {
    match => [ "orderdate", "yyyy-MM-dd HH:mm:ss" ]
    target => "orderdate_parsed"
  }
  
  # Convert numeric strings to integers
  mutate {
    convert => { 
      "salesorderid" => "integer"
      "customerid" => "integer"
      "territoryid" => "integer"
    }
  }
  
  # Convert decimal strings to floats
  mutate {
    convert => {
      "subtotal" => "float"
      "taxamt" => "float"
      "totaldue" => "float"
    }
  }
}

Data Enrichment

filter {
  # Add metadata fields
  mutate {
    add_field => { 
      "[@metadata][ingested_at]" => "%{[@timestamp]}"
      "[@metadata][pipeline_id]" => "sales-orders-v1"
      "[@metadata][source_system]" => "adventureworks-mysql"
    }
  }
  
  # Calculate derived fields
  ruby {
    code => "
      subtotal = event.get('subtotal').to_f
      taxamt = event.get('taxamt').to_f
      freight = event.get('freight').to_f
      
      event.set('calculated_total', subtotal + taxamt + freight)
      event.set('tax_rate', taxamt / subtotal * 100)
    "
  }
  
  # Status code mapping
  translate {
    field => "status"
    destination => "status_description"
    dictionary => {
      "1" => "In Process"
      "2" => "Approved"
      "3" => "Backordered"
      "4" => "Rejected"
      "5" => "Shipped"
      "6" => "Cancelled"
    }
    fallback => "Unknown"
  }
}

Data Validation and Quality

filter {
  # Validate required fields
  if ![salesorderid] or [salesorderid] == "" {
    mutate { add_tag => [ "missing_sales_order_id" ] }
  }
  
  if ![customerid] or [customerid] == "" {
    mutate { add_tag => [ "missing_customer_id" ] }
  }
  
  # Validate data ranges
  if [totaldue] and [totaldue] < 0 {
    mutate { add_tag => [ "negative_total_due" ] }
  }
  
  if [orderdate] and [orderdate] > [@timestamp] {
    mutate { add_tag => [ "future_order_date" ] }
  }
  
  # Data completeness scoring
  ruby {
    code => "
      required_fields = ['salesorderid', 'customerid', 'orderdate', 'totaldue']
      present_fields = required_fields.select { |field| event.get(field) }
      completeness_score = (present_fields.length.to_f / required_fields.length * 100).round(2)
      event.set('data_completeness_score', completeness_score)
    "
  }
  
  # Mark records as valid/invalid
  if "missing_sales_order_id" not in [tags] and "missing_customer_id" not in [tags] {
    mutate { add_tag => [ "valid_record" ] }
  } else {
    mutate { add_tag => [ "invalid_record" ] }
  }
}

Output Configurations

ClickHouse HTTP Output

output {
  # Valid records to ClickHouse
  if "valid_record" in [tags] {
    http {
      url => "http://clickhouse:8123/"
      http_method => "post"
      format => "message"
      message => "INSERT INTO bronze_layer.sales_orders FORMAT JSONEachRow %{message}"
      headers => {
        "Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw=="
        "Content-Type" => "application/json"
      }
      
      # Error handling
      retry_failed => true
      retries => 3
      retry_delay => 5
      
      # Connection pooling
      pool_max => 50
      pool_max_per_route => 25
      
      # Timeout settings
      connect_timeout => 10
      socket_timeout => 30
      request_timeout => 60
    }
  }
  
  # Invalid records to error log
  if "invalid_record" in [tags] {
    file {
      path => "/usr/share/logstash/logs/invalid_records_%{[@metadata][target_table]}_%{+YYYY.MM.dd}.log"
      codec => json_lines
    }
  }
  
  # Debug output (remove in production)
  stdout { 
    codec => rubydebug {
      metadata => true
    }
  }
}

Multiple Output Destinations

output {
  # Primary destination: ClickHouse
  http {
    url => "http://clickhouse:8123/"
    # ... ClickHouse configuration
  }
  
  # Backup destination: File
  file {
    path => "/usr/share/logstash/backups/sales_orders_%{+YYYY.MM.dd}.json"
    codec => json_lines
  }
  
  # Monitoring: Elasticsearch (optional)
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logstash-metrics-%{+YYYY.MM.dd}"
    template_name => "logstash-metrics"
  }
  
  # Alerting: Send errors to monitoring system
  if "error" in [tags] {
    http {
      url => "http://monitoring-system:8080/alerts"
      http_method => "post"
      format => "json"
    }
  }
}

Performance Optimization

JDBC Connection Tuning

input {
  jdbc {
    # Connection pool settings
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks?useSSL=false&allowPublicKeyRetrieval=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048"
    
    # Fetch optimization
    jdbc_fetch_size => 2000        # Rows per fetch from database
    jdbc_page_size => 20000        # Total rows per query execution
    
    # Statement caching
    prepared_statement => true
    
    # Connection validation
    jdbc_validation_timeout => 30
    
    # Performance monitoring
    jdbc_default_timezone => "UTC"
  }
}

Pipeline Worker Configuration

File: config/pipelines.yml

- pipeline.id: sales-orders
  path.config: "/usr/share/logstash/pipeline/sales-orders.conf"
  pipeline.workers: 4              # Number of worker threads
  pipeline.batch.size: 1000        # Events per batch
  pipeline.batch.delay: 50         # Batch timeout in milliseconds
  
- pipeline.id: customers
  path.config: "/usr/share/logstash/pipeline/customers.conf"
  pipeline.workers: 2
  pipeline.batch.size: 500
  pipeline.batch.delay: 100

JVM Tuning

Environment Variables:

# Memory allocation
export LS_JAVA_OPTS="-Xms2g -Xmx4g"

# Garbage collection
export LS_JAVA_OPTS="$LS_JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=200"

# Performance monitoring
export LS_JAVA_OPTS="$LS_JAVA_OPTS -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"

Monitoring and Debugging

Pipeline Monitoring

# Check pipeline stats
curl -s http://localhost:9600/_node/stats/pipelines | jq

# Monitor specific pipeline
curl -s http://localhost:9600/_node/stats/pipelines/sales-orders | jq

# Check node information
curl -s http://localhost:9600/_node | jq

Log Analysis

# Add debugging to configuration
filter {
  if [debug] == "true" {
    mutate {
      add_field => { 
        "[@metadata][debug_info]" => "Processing record %{salesorderid} at %{[@timestamp]}"
      }
    }
  }
}

output {
  if [@metadata][debug_info] {
    stdout { 
      codec => line { 
        format => "%{[@metadata][debug_info]}"
      }
    }
  }
}

Error Handling Patterns

filter {
  # Capture errors in ruby filter
  ruby {
    code => "
      begin
        # Your transformation logic here
        total = event.get('subtotal').to_f + event.get('taxamt').to_f
        event.set('calculated_total', total)
      rescue => e
        event.set('ruby_error', e.message)
        event.tag('_rubyexception')
      end
    "
  }
}

output {
  # Handle Ruby exceptions
  if "_rubyexception" in [tags] {
    file {
      path => "/usr/share/logstash/logs/ruby_errors.log"
      codec => json_lines
    }
  }
  
  # Handle HTTP output errors
  if "_httprequestfailure" in [tags] {
    file {
      path => "/usr/share/logstash/logs/http_errors.log"  
      codec => json_lines
    }
  }
}

Security Configuration

Credential Management

input {
  jdbc {
    # Use environment variables for sensitive data
    jdbc_user => "${MYSQL_USER:root}"
    jdbc_password => "${MYSQL_PASSWORD:password}"
    jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST:mysql}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:adventureworks}"
  }
}

output {
  http {
    url => "http://${CLICKHOUSE_HOST:clickhouse}:${CLICKHOUSE_PORT:8123}/"
    headers => {
      "Authorization" => "Basic ${CLICKHOUSE_AUTH_ENCODED}"
    }
  }
}

SSL/TLS Configuration

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks?useSSL=true&requireSSL=true&verifyServerCertificate=true"
    # SSL certificate configuration
    jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
  }
}

output {
  http {
    url => "https://clickhouse:8443/"
    ssl_verification_mode => "full"
    ssl_certificate_authorities => ["/usr/share/logstash/certs/ca.pem"]
    ssl_certificate => "/usr/share/logstash/certs/client.pem"
    ssl_key => "/usr/share/logstash/certs/client.key"
  }
}

Common Configuration Patterns

Conditional Processing

filter {
  # Process based on table type
  if [type] == "sales_order" {
    # Sales-specific transformations
    mutate {
      add_field => { "[@metadata][target_table]" => "sales_orders" }
    }
  } else if [type] == "customer" {
    # Customer-specific transformations
    mutate {
      add_field => { "[@metadata][target_table]" => "customers" }
    }
  }
  
  # Process based on data values
  if [status] in ["1", "2", "5"] {
    mutate { add_tag => [ "active_order" ] }
  } else {
    mutate { add_tag => [ "inactive_order" ] }
  }
}

Template-based Configuration

# Use templates for dynamic table creation
output {
  http {
    url => "http://clickhouse:8123/"
    format => "message" 
    message => "
      INSERT INTO %{[@metadata][target_database]}.%{[@metadata][target_table]} 
      (%{[@metadata][column_list]}) 
      FORMAT JSONEachRow 
      %{message}
    "
  }
}

Testing LogStash Configurations

Configuration Validation

# Test configuration syntax
docker run --rm \
  -v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  docker.elastic.co/logstash/logstash:8.11.0 \
  logstash --config.test_and_exit

# Expected output: "Configuration OK"

Pipeline Testing

# Run single execution (no scheduling)
docker run --rm \
  -v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v $(pwd)/logstash-drivers:/usr/share/logstash/drivers \
  --network datasuite-network \
  docker.elastic.co/logstash/logstash:8.11.0 \
  logstash --path.data /tmp/logstash-data

Best Practices

Configuration Organization

logstash-config/
├── pipelines.yml                 # Pipeline definitions
├── logstash.yml                  # Main LogStash settings
├── pipelines/
│   ├── sales-orders.conf        # Individual pipeline configs
│   ├── customers.conf
│   └── products.conf
├── sql/
│   ├── sales_orders.sql         # SQL queries
│   ├── customers.sql
│   └── products.sql
└── patterns/
    └── custom-patterns          # Custom grok patterns

Error Recovery Strategies

  1. Persistent Queues: Enable for guaranteed delivery

  2. Dead Letter Queues: Capture failed events for analysis

  3. Circuit Breakers: Prevent cascade failures

  4. Retry Logic: Automatic retry with exponential backoff

  5. Monitoring: Comprehensive metrics and alerting

Performance Guidelines

  1. Batch Size: Balance between latency and throughput (1000-5000 events)

  2. Workers: Match CPU cores (typically 2-4 workers per pipeline)

  3. Memory: Allocate sufficient heap space (50% of container memory)

  4. Network: Use persistent connections and connection pooling

  5. Disk I/O: Use SSD storage for persistent queues

Next Steps

With LogStash configured for data ingestion:

  1. DBT Getting Started - Transform the ingested data

  2. Testing & Validation - Ensure data quality

  3. Troubleshooting - Resolve pipeline issues

Your LogStash configuration forms the foundation of the data pipeline, ensuring reliable and efficient data ingestion from source systems to the data warehouse.

Last updated

Was this helpful?