LogStash Configuration
LogStash serves as the data ingestion engine in the DataSuite ETL pipeline, extracting data from MySQL and loading it into ClickHouse. This guide covers configuration, customization, and optimization of LogStash pipelines.
Understanding LogStash Architecture
Pipeline Components
Input → Filter → Output
↓ ↓ ↓
MySQL Transform ClickHouse
Input Plugins: Extract data from various sources (JDBC, files, APIs) Filter Plugins: Transform, enrich, and validate data Output Plugins: Load data to destinations (HTTP, databases, files)
Configuration Structure
LogStash configurations use a declarative syntax:
input {
# Data source configuration
}
filter {
# Data transformation logic
}
output {
# Destination configuration
}
Basic JDBC Input Configuration
MySQL Connection Setup
input {
jdbc {
# JDBC driver configuration
jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
jdbc_user => "root"
jdbc_password => "password"
# Query configuration
statement => "SELECT * FROM Sales.SalesOrderHeader WHERE ModifiedDate > :sql_last_value ORDER BY ModifiedDate"
# Incremental loading setup
use_column_value => true
tracking_column => "ModifiedDate"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
clean_run => false
# Scheduling
schedule => "*/30 * * * * *" # Every 30 seconds
# Performance tuning
jdbc_fetch_size => 1000
jdbc_page_size => 10000
statement_filepath => "/usr/share/logstash/sql/sales_orders.sql"
}
}
Key Configuration Parameters
jdbc_connection_string
Database URL
jdbc:mysql://mysql:3306/adventureworks
statement
SQL query to execute
SELECT * FROM table WHERE id > :sql_last_value
tracking_column
Column for incremental sync
ModifiedDate
, id
schedule
Execution frequency
*/30 * * * * *
(cron format)
jdbc_fetch_size
Rows per database fetch
1000
last_run_metadata_path
State persistence file
/path/to/.logstash_jdbc_last_run
Advanced Input Configurations
Multiple Table Ingestion
# Configuration for multiple pipelines
# File: pipelines.yml
- pipeline.id: sales-orders
path.config: "/usr/share/logstash/pipeline/sales-orders.conf"
- pipeline.id: customers
path.config: "/usr/share/logstash/pipeline/customers.conf"
- pipeline.id: products
path.config: "/usr/share/logstash/pipeline/products.conf"
Sales Orders Pipeline (sales-orders.conf
):
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks"
jdbc_user => "root"
jdbc_password => "password"
statement_filepath => "/usr/share/logstash/sql/sales_orders.sql"
use_column_value => true
tracking_column => "ModifiedDate"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.sales_orders_last_run"
schedule => "*/30 * * * * *"
type => "sales_order"
}
}
filter {
if [type] == "sales_order" {
mutate {
add_field => { "[@metadata][target_table]" => "sales_orders" }
add_field => { "[@metadata][target_database]" => "bronze_layer" }
}
}
}
output {
if [type] == "sales_order" {
http {
url => "http://clickhouse:8123/"
http_method => "post"
format => "message"
message => "INSERT INTO %{[@metadata][target_database]}.%{[@metadata][target_table]} FORMAT JSONEachRow %{message}"
headers => {
"Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw=="
"Content-Type" => "application/json"
}
}
}
}
Complex SQL Queries
File: sql/sales_orders.sql
SELECT
soh.SalesOrderID,
soh.CustomerID,
soh.TerritoryID,
soh.OrderDate,
soh.DueDate,
soh.ShipDate,
soh.Status,
soh.SubTotal,
soh.TaxAmt,
soh.Freight,
soh.TotalDue,
soh.ModifiedDate,
-- Customer information (join)
c.PersonID as CustomerPersonID,
c.StoreID as CustomerStoreID,
-- Territory information (join)
st.Name as TerritoryName,
st.CountryRegionCode,
st.Group as TerritoryGroup
FROM Sales.SalesOrderHeader soh
LEFT JOIN Sales.Customer c ON soh.CustomerID = c.CustomerID
LEFT JOIN Sales.SalesTerritory st ON soh.TerritoryID = st.TerritoryID
WHERE soh.ModifiedDate > :sql_last_value
ORDER BY soh.ModifiedDate ASC
LIMIT 1000
Filter Transformations
Data Type Conversions
filter {
# Convert string dates to proper timestamp format
date {
match => [ "orderdate", "yyyy-MM-dd HH:mm:ss" ]
target => "orderdate_parsed"
}
# Convert numeric strings to integers
mutate {
convert => {
"salesorderid" => "integer"
"customerid" => "integer"
"territoryid" => "integer"
}
}
# Convert decimal strings to floats
mutate {
convert => {
"subtotal" => "float"
"taxamt" => "float"
"totaldue" => "float"
}
}
}
Data Enrichment
filter {
# Add metadata fields
mutate {
add_field => {
"[@metadata][ingested_at]" => "%{[@timestamp]}"
"[@metadata][pipeline_id]" => "sales-orders-v1"
"[@metadata][source_system]" => "adventureworks-mysql"
}
}
# Calculate derived fields
ruby {
code => "
subtotal = event.get('subtotal').to_f
taxamt = event.get('taxamt').to_f
freight = event.get('freight').to_f
event.set('calculated_total', subtotal + taxamt + freight)
event.set('tax_rate', taxamt / subtotal * 100)
"
}
# Status code mapping
translate {
field => "status"
destination => "status_description"
dictionary => {
"1" => "In Process"
"2" => "Approved"
"3" => "Backordered"
"4" => "Rejected"
"5" => "Shipped"
"6" => "Cancelled"
}
fallback => "Unknown"
}
}
Data Validation and Quality
filter {
# Validate required fields
if ![salesorderid] or [salesorderid] == "" {
mutate { add_tag => [ "missing_sales_order_id" ] }
}
if ![customerid] or [customerid] == "" {
mutate { add_tag => [ "missing_customer_id" ] }
}
# Validate data ranges
if [totaldue] and [totaldue] < 0 {
mutate { add_tag => [ "negative_total_due" ] }
}
if [orderdate] and [orderdate] > [@timestamp] {
mutate { add_tag => [ "future_order_date" ] }
}
# Data completeness scoring
ruby {
code => "
required_fields = ['salesorderid', 'customerid', 'orderdate', 'totaldue']
present_fields = required_fields.select { |field| event.get(field) }
completeness_score = (present_fields.length.to_f / required_fields.length * 100).round(2)
event.set('data_completeness_score', completeness_score)
"
}
# Mark records as valid/invalid
if "missing_sales_order_id" not in [tags] and "missing_customer_id" not in [tags] {
mutate { add_tag => [ "valid_record" ] }
} else {
mutate { add_tag => [ "invalid_record" ] }
}
}
Output Configurations
ClickHouse HTTP Output
output {
# Valid records to ClickHouse
if "valid_record" in [tags] {
http {
url => "http://clickhouse:8123/"
http_method => "post"
format => "message"
message => "INSERT INTO bronze_layer.sales_orders FORMAT JSONEachRow %{message}"
headers => {
"Authorization" => "Basic YWRtaW46Y2xpY2tob3VzZTEyMw=="
"Content-Type" => "application/json"
}
# Error handling
retry_failed => true
retries => 3
retry_delay => 5
# Connection pooling
pool_max => 50
pool_max_per_route => 25
# Timeout settings
connect_timeout => 10
socket_timeout => 30
request_timeout => 60
}
}
# Invalid records to error log
if "invalid_record" in [tags] {
file {
path => "/usr/share/logstash/logs/invalid_records_%{[@metadata][target_table]}_%{+YYYY.MM.dd}.log"
codec => json_lines
}
}
# Debug output (remove in production)
stdout {
codec => rubydebug {
metadata => true
}
}
}
Multiple Output Destinations
output {
# Primary destination: ClickHouse
http {
url => "http://clickhouse:8123/"
# ... ClickHouse configuration
}
# Backup destination: File
file {
path => "/usr/share/logstash/backups/sales_orders_%{+YYYY.MM.dd}.json"
codec => json_lines
}
# Monitoring: Elasticsearch (optional)
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logstash-metrics-%{+YYYY.MM.dd}"
template_name => "logstash-metrics"
}
# Alerting: Send errors to monitoring system
if "error" in [tags] {
http {
url => "http://monitoring-system:8080/alerts"
http_method => "post"
format => "json"
}
}
}
Performance Optimization
JDBC Connection Tuning
input {
jdbc {
# Connection pool settings
jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks?useSSL=false&allowPublicKeyRetrieval=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=250&prepStmtCacheSqlLimit=2048"
# Fetch optimization
jdbc_fetch_size => 2000 # Rows per fetch from database
jdbc_page_size => 20000 # Total rows per query execution
# Statement caching
prepared_statement => true
# Connection validation
jdbc_validation_timeout => 30
# Performance monitoring
jdbc_default_timezone => "UTC"
}
}
Pipeline Worker Configuration
File: config/pipelines.yml
- pipeline.id: sales-orders
path.config: "/usr/share/logstash/pipeline/sales-orders.conf"
pipeline.workers: 4 # Number of worker threads
pipeline.batch.size: 1000 # Events per batch
pipeline.batch.delay: 50 # Batch timeout in milliseconds
- pipeline.id: customers
path.config: "/usr/share/logstash/pipeline/customers.conf"
pipeline.workers: 2
pipeline.batch.size: 500
pipeline.batch.delay: 100
JVM Tuning
Environment Variables:
# Memory allocation
export LS_JAVA_OPTS="-Xms2g -Xmx4g"
# Garbage collection
export LS_JAVA_OPTS="$LS_JAVA_OPTS -XX:+UseG1GC -XX:MaxGCPauseMillis=200"
# Performance monitoring
export LS_JAVA_OPTS="$LS_JAVA_OPTS -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
Monitoring and Debugging
Pipeline Monitoring
# Check pipeline stats
curl -s http://localhost:9600/_node/stats/pipelines | jq
# Monitor specific pipeline
curl -s http://localhost:9600/_node/stats/pipelines/sales-orders | jq
# Check node information
curl -s http://localhost:9600/_node | jq
Log Analysis
# Add debugging to configuration
filter {
if [debug] == "true" {
mutate {
add_field => {
"[@metadata][debug_info]" => "Processing record %{salesorderid} at %{[@timestamp]}"
}
}
}
}
output {
if [@metadata][debug_info] {
stdout {
codec => line {
format => "%{[@metadata][debug_info]}"
}
}
}
}
Error Handling Patterns
filter {
# Capture errors in ruby filter
ruby {
code => "
begin
# Your transformation logic here
total = event.get('subtotal').to_f + event.get('taxamt').to_f
event.set('calculated_total', total)
rescue => e
event.set('ruby_error', e.message)
event.tag('_rubyexception')
end
"
}
}
output {
# Handle Ruby exceptions
if "_rubyexception" in [tags] {
file {
path => "/usr/share/logstash/logs/ruby_errors.log"
codec => json_lines
}
}
# Handle HTTP output errors
if "_httprequestfailure" in [tags] {
file {
path => "/usr/share/logstash/logs/http_errors.log"
codec => json_lines
}
}
}
Security Configuration
Credential Management
input {
jdbc {
# Use environment variables for sensitive data
jdbc_user => "${MYSQL_USER:root}"
jdbc_password => "${MYSQL_PASSWORD:password}"
jdbc_connection_string => "jdbc:mysql://${MYSQL_HOST:mysql}:${MYSQL_PORT:3306}/${MYSQL_DATABASE:adventureworks}"
}
}
output {
http {
url => "http://${CLICKHOUSE_HOST:clickhouse}:${CLICKHOUSE_PORT:8123}/"
headers => {
"Authorization" => "Basic ${CLICKHOUSE_AUTH_ENCODED}"
}
}
}
SSL/TLS Configuration
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://mysql:3306/adventureworks?useSSL=true&requireSSL=true&verifyServerCertificate=true"
# SSL certificate configuration
jdbc_driver_library => "/usr/share/logstash/drivers/mysql-connector-java.jar"
}
}
output {
http {
url => "https://clickhouse:8443/"
ssl_verification_mode => "full"
ssl_certificate_authorities => ["/usr/share/logstash/certs/ca.pem"]
ssl_certificate => "/usr/share/logstash/certs/client.pem"
ssl_key => "/usr/share/logstash/certs/client.key"
}
}
Common Configuration Patterns
Conditional Processing
filter {
# Process based on table type
if [type] == "sales_order" {
# Sales-specific transformations
mutate {
add_field => { "[@metadata][target_table]" => "sales_orders" }
}
} else if [type] == "customer" {
# Customer-specific transformations
mutate {
add_field => { "[@metadata][target_table]" => "customers" }
}
}
# Process based on data values
if [status] in ["1", "2", "5"] {
mutate { add_tag => [ "active_order" ] }
} else {
mutate { add_tag => [ "inactive_order" ] }
}
}
Template-based Configuration
# Use templates for dynamic table creation
output {
http {
url => "http://clickhouse:8123/"
format => "message"
message => "
INSERT INTO %{[@metadata][target_database]}.%{[@metadata][target_table]}
(%{[@metadata][column_list]})
FORMAT JSONEachRow
%{message}
"
}
}
Testing LogStash Configurations
Configuration Validation
# Test configuration syntax
docker run --rm \
-v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
docker.elastic.co/logstash/logstash:8.11.0 \
logstash --config.test_and_exit
# Expected output: "Configuration OK"
Pipeline Testing
# Run single execution (no scheduling)
docker run --rm \
-v $(pwd)/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-v $(pwd)/logstash-drivers:/usr/share/logstash/drivers \
--network datasuite-network \
docker.elastic.co/logstash/logstash:8.11.0 \
logstash --path.data /tmp/logstash-data
Best Practices
Configuration Organization
logstash-config/
├── pipelines.yml # Pipeline definitions
├── logstash.yml # Main LogStash settings
├── pipelines/
│ ├── sales-orders.conf # Individual pipeline configs
│ ├── customers.conf
│ └── products.conf
├── sql/
│ ├── sales_orders.sql # SQL queries
│ ├── customers.sql
│ └── products.sql
└── patterns/
└── custom-patterns # Custom grok patterns
Error Recovery Strategies
Persistent Queues: Enable for guaranteed delivery
Dead Letter Queues: Capture failed events for analysis
Circuit Breakers: Prevent cascade failures
Retry Logic: Automatic retry with exponential backoff
Monitoring: Comprehensive metrics and alerting
Performance Guidelines
Batch Size: Balance between latency and throughput (1000-5000 events)
Workers: Match CPU cores (typically 2-4 workers per pipeline)
Memory: Allocate sufficient heap space (50% of container memory)
Network: Use persistent connections and connection pooling
Disk I/O: Use SSD storage for persistent queues
Next Steps
With LogStash configured for data ingestion:
DBT Getting Started - Transform the ingested data
Testing & Validation - Ensure data quality
Troubleshooting - Resolve pipeline issues
Your LogStash configuration forms the foundation of the data pipeline, ensuring reliable and efficient data ingestion from source systems to the data warehouse.
Last updated
Was this helpful?