# LogStash Configuration

LogStash serves as the data ingestion engine in the DataSuite ETL pipeline, extracting data from MySQL and loading it into ClickHouse. This guide covers configuration, customization, and optimization of LogStash pipelines.

## Understanding LogStash Architecture

### Pipeline Components

```
Input → Filter → Output
  ↓       ↓       ↓
MySQL   Transform ClickHouse
```

**Input Plugins**: Extract data from various sources (JDBC, files, APIs) **Filter Plugins**: Transform, enrich, and validate data **Output Plugins**: Load data to destinations (HTTP, databases, files)

### Configuration Structure

LogStash configurations use a declarative syntax:

```ruby
input {
  # Data source configuration
}

filter {
  # Data transformation logic
}

output {
  # Destination configuration
}
```

## Basic JDBC Input Configuration

### MySQL Connection Setup

```ruby
input {
  jdbc {
    # JDBC driver configuration
    jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
    jdbc_user => "root"
    jdbc_password => "password"
    
    # Query configuration
    statement => "SELECT * FROM Sales.SalesOrderHeader WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate"
    
    # Incremental loading setup
    use_column_value => true
    tracking_column => "ModifiedDate"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
    clean_run => false
    
    # Scheduling
    schedule => "*/30 * * * * *"  # Every 30 seconds
    
    # Performance tuning
    jdbc_fetch_size => 1000
    jdbc_page_size => 10000
    statement_filepath => "/usr/share/logstash/sql/sales_orders.sql"
  }
}
```

### Key Configuration Parameters

| Parameter                | Purpose                     | Example                                          |
| ------------------------ | --------------------------- | ------------------------------------------------ |
| `jdbc_connection_string` | Database URL                | `jdbc:mysql://mysql:3306/adventureworks`         |
| `statement`              | SQL query to execute        | `SELECT * FROM table WHERE id > :sql_last_value` |
| `tracking_column`        | Column for incremental sync | `ModifiedDate`, `id`                             |
| `schedule`               | Execution frequency         | `*/30 * * * * *` (cron format)                   |
| `jdbc_fetch_size`        | Rows per database fetch     | `1000`                                           |
| `last_run_metadata_path` | State persistence file      | `/path/to/.logstash_jdbc_last_run`               |

## Advanced Input Configurations

### Multiple Table Ingestion

```ruby
# Configuration for multiple pipelines
# File: pipelines.yml
- pipeline.id: sales-orders
  path.config: "/usr/share/logstash/pipeline/sales-orders.conf"
  
- pipeline.id: customers
  path.config: "/usr/share/logstash/pipeline/customers.conf"
  
- pipeline.id: products
  path.config: "/usr/share/logstash/pipeline/products.conf"
```

**Sales Orders Pipeline** (`sales-orders.conf`):

```ruby
input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
    jdbc_user => "root"
    jdbc_password => "password"
    statement_filepath => "/usr/share/logstash/sql/sales_orders.sql"
    use_column_value => true
    tracking_column => "ModifiedDate"
    tracking_column_type => "timestamp"
    last_run_metadata_path => "/usr/share/logstash/.sales_orders_last_run"
    schedule => "*/30 * * * * *"
    type => "sales_order"
  }
}

filter {
  if [type] == "sales_order" {
    mutate {
      add_field => { "[@metadata][target_table]" => "sales_orders" }
      add_field => { "[@metadata][target_database]" => "bronze_layer" }
    }
  }
}

output {
  if [type] == "sales_order" {
    http {
      url => "http://clickhouse:8123/"
      http_method => "post" 
      format => "message"
      message => "INSERT INTO %{[@metadata][target_database]}.%{[@metadata][target_table]} FORMAT JSONEachRow %{message}"
      headers => {
        "Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw=="
        "Content-Type" => "application/json"
      }
    }
  }
}
```

### Complex SQL Queries

**File: `sql/sales_orders.sql`**

```sql
SELECT 
    soh.SalesOrderID,
    soh.CustomerID,
    soh.TerritoryID,
    soh.OrderDate,
    soh.DueDate,
    soh.ShipDate,
    soh.Status,
    soh.SubTotal,
    soh.TaxAmt,
    soh.Freight,
    soh.TotalDue,
    soh.ModifiedDate,
    
    -- Customer information (join)
    c.PersonID as CustomerPersonID,
    c.StoreID as CustomerStoreID,
    
    -- Territory information (join)  
    st.Name as TerritoryName,
    st.CountryRegionCode,
    st.Group as TerritoryGroup
    
FROM Sales.SalesOrderHeader soh
LEFT JOIN Sales.Customer c ON soh.CustomerID = c.CustomerID
LEFT JOIN Sales.SalesTerritory st ON soh.TerritoryID = st.TerritoryID
WHERE soh.ModifiedDate > :sql_last_value
ORDER BY soh.ModifiedDate ASC
LIMIT 1000
```

## Filter Transformations

### Data Type Conversions

```ruby
filter {
  # Convert string dates to proper timestamp format
  date {
    match => [ "orderdate", "yyyy-MM-dd HH:mm:ss" ]
    target => "orderdate_parsed"
  }
  
  # Convert numeric strings to integers
  mutate {
    convert => { 
      "salesorderid" => "integer"
      "customerid" => "integer"
      "territoryid" => "integer"
    }
  }
  
  # Convert decimal strings to floats
  mutate {
    convert => {
      "subtotal" => "float"
      "taxamt" => "float"
      "totaldue" => "float"
    }
  }
}
```

### Data Enrichment

```ruby
filter {
  # Add metadata fields
  mutate {
    add_field => { 
      "[@metadata][ingested_at]" => "%{[@timestamp]}"
      "[@metadata][pipeline_id]" => "sales-orders-v1"
      "[@metadata][source_system]" => "adventureworks-mysql"
    }
  }
  
  # Calculate derived fields
  ruby {
    code => "
      subtotal = event.get('subtotal').to_f
      taxamt = event.get('taxamt').to_f
      freight = event.get('freight').to_f
      
      event.set('calculated_total', subtotal + taxamt + freight)
      event.set('tax_rate', taxamt / subtotal * 100)
    "
  }
  
  # Status code mapping
  translate {
    field => "status"
    destination => "status_description"
    dictionary => {
      "1" => "In Process"
      "2" => "Approved"
      "3" => "Backordered"
      "4" => "Rejected"
      "5" => "Shipped"
      "6" => "Cancelled"
    }
    fallback => "Unknown"
  }
}
```

### Data Validation and Quality

```ruby
filter {
  # Validate required fields
  if ![salesorderid] or [salesorderid] == "" {
    mutate { add_tag => [ "missing_sales_order_id" ] }
  }
  
  if ![customerid] or [customerid] == "" {
    mutate { add_tag => [ "missing_customer_id" ] }
  }
  
  # Validate data ranges
  if [totaldue] and [totaldue] < 0 {
    mutate { add_tag => [ "negative_total_due" ] }
  }
  
  if [orderdate] and [orderdate] > [@timestamp] {
    mutate { add_tag => [ "future_order_date" ] }
  }
  
  # Data completeness scoring
  ruby {
    code => "
      required_fields = ['salesorderid', 'customerid', 'orderdate', 'totaldue']
      present_fields = required_fields.select { |field| event.get(field) }
      completeness_score = (present_fields.length.to_f / required_fields.length * 100).round(2)
      event.set('data_completeness_score', completeness_score)
    "
  }
  
  # Mark records as valid/invalid
  if "missing_sales_order_id" not in [tags] and "missing_customer_id" not in [tags] {
    mutate { add_tag => [ "valid_record" ] }
  } else {
    mutate { add_tag => [ "invalid_record" ] }
  }
}
```

## Output Configurations

### ClickHouse HTTP Output

```ruby
output {
  # Valid records to ClickHouse
  if "valid_record" in [tags] {
    http {
      url => "http://clickhouse:8123/"
      http_method => "post"
      format => "message"
      message => "INSERT INTO bronze_layer.sales_orders FORMAT JSONEachRow %{message}"
      headers => {
        "Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw=="
        "Content-Type" => "application/json"
      }
      
      # Error handling
      retry_failed => true
      retries => 3
      retry_delay => 5
      
      # Connection pooling
      pool_max => 50
      pool_max_per_route => 25
      
      # Timeout settings
      connect_timeout => 10
      socket_timeout => 30
      request_timeout => 60
    }
  }
  
  # Invalid records to error log
  if "invalid_record" in [tags] {
    file {
      path => "/usr/share/logstash/logs/invalid_records_%{[@metadata][target_table]}_%{+YYYY.MM.dd}.log"
      codec => json_lines
    }
  }
  
  # Debug output (remove in production)
  stdout { 
    codec => rubydebug {
      metadata => true
    }
  }
}
```

### Multiple Output Destinations

```ruby
output {
  # Primary destination: ClickHouse
  http {
    url => "http://clickhouse:8123/"
    # ... ClickHouse configuration
  }
  
  # Backup destination: File
  file {
    path => "/usr/share/logstash/backups/sales_orders_%{+YYYY.MM.dd}.json"
    codec => json_lines
  }
  
  # Monitoring: Elasticsearch (optional)
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logstash-metrics-%{+YYYY.MM.dd}"
    template_name => "logstash-metrics"
  }
  
  # Alerting: Send errors to monitoring system
  if "error" in [tags] {
    http {
      url => "http://monitoring-system:8080/alerts"
      http_method => "post"
      format => "json"
    }
  }
}
```

## Performance Optimization

### JDBC Connection Tuning

```ruby
input {
  jdbc {
    # Connection pool settings
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks?useSSL=false&allowPublicKeyRetrieval=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048"
    
    # Fetch optimization
    jdbc_fetch_size => 2000        # Rows per fetch from database
    jdbc_page_size => 20000        # Total rows per query execution
    
    # Statement caching
    prepared_statement => true
    
    # Connection validation
    jdbc_validation_timeout => 30
    
    # Performance monitoring
    jdbc_default_timezone => "UTC"
  }
}
```

### Pipeline Worker Configuration

**File: `config/pipelines.yml`**

```yaml
- pipeline.id: sales-orders
  path.config: "/usr/share/logstash/pipeline/sales-orders.conf"
  pipeline.workers: 4              # Number of worker threads
  pipeline.batch.size: 1000        # Events per batch
  pipeline.batch.delay: 50         # Batch timeout in milliseconds
  
- pipeline.id: customers
  path.config: "/usr/share/logstash/pipeline/customers.conf"
  pipeline.workers: 2
  pipeline.batch.size: 500
  pipeline.batch.delay: 100
```

### JVM Tuning

**Environment Variables:**

```bash
# Memory allocation
export LS_JAVA_OPTS="-Xms2g -Xmx4g"

# Garbage collection
export LS_JAVA_OPTS="$LS_JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=200"

# Performance monitoring
export LS_JAVA_OPTS="$LS_JAVA_OPTS -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
```

## Monitoring and Debugging

### Pipeline Monitoring

```bash
# Check pipeline stats
curl -s http://localhost:9600/_node/stats/pipelines | jq

# Monitor specific pipeline
curl -s http://localhost:9600/_node/stats/pipelines/sales-orders | jq

# Check node information
curl -s http://localhost:9600/_node | jq
```

### Log Analysis

```ruby
# Add debugging to configuration
filter {
  if [debug] == "true" {
    mutate {
      add_field => { 
        "[@metadata][debug_info]" => "Processing record %{salesorderid} at %{[@timestamp]}"
      }
    }
  }
}

output {
  if [@metadata][debug_info] {
    stdout { 
      codec => line { 
        format => "%{[@metadata][debug_info]}"
      }
    }
  }
}
```

### Error Handling Patterns

```ruby
filter {
  # Capture errors in ruby filter
  ruby {
    code => "
      begin
        # Your transformation logic here
        total = event.get('subtotal').to_f + event.get('taxamt').to_f
        event.set('calculated_total', total)
      rescue => e
        event.set('ruby_error', e.message)
        event.tag('_rubyexception')
      end
    "
  }
}

output {
  # Handle Ruby exceptions
  if "_rubyexception" in [tags] {
    file {
      path => "/usr/share/logstash/logs/ruby_errors.log"
      codec => json_lines
    }
  }
  
  # Handle HTTP output errors
  if "_httprequestfailure" in [tags] {
    file {
      path => "/usr/share/logstash/logs/http_errors.log"  
      codec => json_lines
    }
  }
}
```

## Security Configuration

### Credential Management

```ruby
input {
  jdbc {
    # Use environment variables for sensitive data
    jdbc_user => "${MYSQL_USER:root}"
    jdbc_password => "${MYSQL_PASSWORD:password}"
    jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST:mysql}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:adventureworks}"
  }
}

output {
  http {
    url => "http://${CLICKHOUSE_HOST:clickhouse}:${CLICKHOUSE_PORT:8123}/"
    headers => {
      "Authorization" => "Basic ${CLICKHOUSE_AUTH_ENCODED}"
    }
  }
}
```

### SSL/TLS Configuration

```ruby
input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks?useSSL=true&requireSSL=true&verifyServerCertificate=true"
    # SSL certificate configuration
    jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
  }
}

output {
  http {
    url => "https://clickhouse:8443/"
    ssl_verification_mode => "full"
    ssl_certificate_authorities => ["/usr/share/logstash/certs/ca.pem"]
    ssl_certificate => "/usr/share/logstash/certs/client.pem"
    ssl_key => "/usr/share/logstash/certs/client.key"
  }
}
```

## Common Configuration Patterns

### Conditional Processing

```ruby
filter {
  # Process based on table type
  if [type] == "sales_order" {
    # Sales-specific transformations
    mutate {
      add_field => { "[@metadata][target_table]" => "sales_orders" }
    }
  } else if [type] == "customer" {
    # Customer-specific transformations
    mutate {
      add_field => { "[@metadata][target_table]" => "customers" }
    }
  }
  
  # Process based on data values
  if [status] in ["1", "2", "5"] {
    mutate { add_tag => [ "active_order" ] }
  } else {
    mutate { add_tag => [ "inactive_order" ] }
  }
}
```

### Template-based Configuration

```ruby
# Use templates for dynamic table creation
output {
  http {
    url => "http://clickhouse:8123/"
    format => "message" 
    message => "
      INSERT INTO %{[@metadata][target_database]}.%{[@metadata][target_table]} 
      (%{[@metadata][column_list]}) 
      FORMAT JSONEachRow 
      %{message}
    "
  }
}
```

## Testing LogStash Configurations

### Configuration Validation

```bash
# Test configuration syntax
docker run --rm \
  -v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  docker.elastic.co/logstash/logstash:8.11.0 \
  logstash --config.test_and_exit

# Expected output: "Configuration OK"
```

### Pipeline Testing

```bash
# Run single execution (no scheduling)
docker run --rm \
  -v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v $(pwd)/logstash-drivers:/usr/share/logstash/drivers \
  --network datasuite-network \
  docker.elastic.co/logstash/logstash:8.11.0 \
  logstash --path.data /tmp/logstash-data
```

## Best Practices

### Configuration Organization

```
logstash-config/
├── pipelines.yml                 # Pipeline definitions
├── logstash.yml                  # Main LogStash settings
├── pipelines/
│   ├── sales-orders.conf        # Individual pipeline configs
│   ├── customers.conf
│   └── products.conf
├── sql/
│   ├── sales_orders.sql         # SQL queries
│   ├── customers.sql
│   └── products.sql
└── patterns/
    └── custom-patterns          # Custom grok patterns
```

### Error Recovery Strategies

1. **Persistent Queues**: Enable for guaranteed delivery
2. **Dead Letter Queues**: Capture failed events for analysis
3. **Circuit Breakers**: Prevent cascade failures
4. **Retry Logic**: Automatic retry with exponential backoff
5. **Monitoring**: Comprehensive metrics and alerting

### Performance Guidelines

1. **Batch Size**: Balance between latency and throughput (1000-5000 events)
2. **Workers**: Match CPU cores (typically 2-4 workers per pipeline)
3. **Memory**: Allocate sufficient heap space (50% of container memory)
4. **Network**: Use persistent connections and connection pooling
5. **Disk I/O**: Use SSD storage for persistent queues

## Next Steps

With LogStash configured for data ingestion:

1. [**DBT Getting Started**](/data-platform/index/development/dbt-getting-started.md) - Transform the ingested data
2. [**Testing & Validation**](/data-platform/index/development/testing-validation.md) - Ensure data quality
3. [**Troubleshooting**](/data-platform/index/troubleshooting/common-issues.md) - Resolve pipeline issues

Your LogStash configuration forms the foundation of the data pipeline, ensuring reliable and efficient data ingestion from source systems to the data warehouse.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.datasuite.vn/data-platform/index/development/logstash-configuration.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
