Log Pipelines - Street-Level Ops¶
Quick Diagnosis Commands¶
When logs stop showing up in your search tool:
# 1. Is the log agent running?
systemctl status fluent-bit
systemctl status fluentd
systemctl status vector
# 2. Check agent logs for errors
journalctl -u fluent-bit --since "10 minutes ago"
journalctl -u vector -n 100
# 3. Are log files being written to?
ls -la /var/log/app/
tail -1 /var/log/app/app.log
# 4. Is the destination reachable?
curl -s http://elasticsearch:9200/_cluster/health | python3 -m json.tool
curl -s http://elasticsearch:9200/_cat/indices?v | head -20
# 5. Check pipeline buffer status (Fluentbit)
curl -s http://localhost:2020/api/v1/metrics | python3 -m json.tool
# 6. Check disk space (full disk = lost logs)
df -h /var/log /var/lib/fluentd /tmp
# 7. Check file descriptor usage (too many open log files)
ls /proc/$(pidof fluent-bit)/fd | wc -l
Pattern: Fluentbit Configuration for Kubernetes¶
The standard pattern: Fluentbit as a DaemonSet reading container logs.
# fluent-bit.conf
[SERVICE]
Flush 5
Log_Level info
Daemon off
Parsers_File parsers.conf
HTTP_Server On
HTTP_Listen 0.0.0.0
HTTP_Port 2020
[INPUT]
Name tail
Tag kube.*
Path /var/log/containers/*.log
Parser cri
DB /var/log/flb_kube.db
Mem_Buf_Limit 15MB
Skip_Long_Lines On
Refresh_Interval 10
[FILTER]
Name kubernetes
Match kube.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Merge_Log On
K8S-Logging.Parser On
K8S-Logging.Exclude On
[OUTPUT]
Name es
Match kube.*
Host ${ELASTICSEARCH_HOST}
Port 9200
Index k8s-logs
Logstash_Format On
Retry_Limit 5
Key settings explained:¶
DB /var/log/flb_kube.db Track file positions (survive restarts)
Mem_Buf_Limit 15MB Per-input memory cap (backpressure trigger)
Skip_Long_Lines On Do not choke on lines > Buffer_Max_Size
Merge_Log On Parse JSON from container log message field
Retry_Limit 5 Retry failed sends 5 times before dropping
Pattern: Debugging Blocked Pipelines¶
When the buffer is full and logs are being dropped:
# Check Fluentbit metrics endpoint
curl -s http://localhost:2020/api/v1/metrics/prometheus | grep -E 'dropped|retry|error'
# Check Fluentd buffer status
# Look for "buffer is full" in logs
journalctl -u fluentd | grep -i "buffer\|overflow\|retry"
# Check Vector pipeline health
curl -s http://localhost:8686/health
# Vector metrics (if Prometheus sink configured)
curl -s http://localhost:9598/metrics | grep -E 'events_in|events_out|buffer'
# Common causes of blocked pipelines:
# 1. Destination is down or slow
curl -s -o /dev/null -w "%{http_code} %{time_total}s" http://elasticsearch:9200/
# 2. Disk buffer is full
df -h /var/lib/fluentd/buffer/
# 3. DNS resolution failure (cannot reach destination)
dig elasticsearch.internal
# 4. TLS certificate expired
openssl s_client -connect elasticsearch:9200 2>/dev/null | openssl x509 -noout -dates
Gotcha: Losing Logs on Agent Restart¶
If the agent does not track file offsets, restarting it causes either log duplication (re-reads from beginning) or loss (starts from end):
# Fluentbit: ALWAYS configure position tracking
[INPUT]
Name tail
Path /var/log/app/*.log
DB /var/log/flb_positions.db # THIS IS CRITICAL
# Without DB, Fluentbit starts from the end of the file on restart
# and you lose everything written while it was down
# Fluentd: position file
<source>
@type tail
path /var/log/app/*.log
pos_file /var/log/fluentd/pos/app.pos # THIS IS CRITICAL
<parse>
@type json
</parse>
</source>
# Vector: checkpoint path
[sources.app_logs]
type = "file"
include = ["/var/log/app/*.log"]
data_dir = "/var/lib/vector" # stores file checkpoints here
Pattern: Multiline Log Parsing¶
# Fluentbit: Java stack trace multiline
[MULTILINE_PARSER]
name java_multiline
type regex
flush_timeout 2000
rule "start_state" "/^\d{4}-\d{2}-\d{2}/" "cont"
rule "cont" "/^\s+/" "cont"
[INPUT]
Name tail
Path /var/log/app/app.log
Tag app.java
multiline.parser java_multiline
DB /var/log/flb_app.db
# Vector: multiline with condition
[sources.app_logs]
type = "file"
include = ["/var/log/app/app.log"]
multiline.mode = "halt_before"
multiline.start_pattern = '^\d{4}-\d{2}-\d{2}'
multiline.condition_pattern = '^\s+'
multiline.timeout_ms = 2000
Test multiline parsing by tailing the agent's output, not the destination. This isolates parsing issues from network/destination issues.
Gotcha: Regex Parsing Performance¶
Regex-heavy parsing configs are the silent killer of log pipeline throughput:
# Benchmark: parsing 10,000 events/sec
# JSON parse: ~50,000 events/sec (fast)
# Regex parse: ~5,000 events/sec (10x slower)
# Grok parse: ~3,000 events/sec (even worse)
# Check if parsing is the bottleneck:
# 1. Monitor CPU usage of the log agent
top -p $(pidof fluent-bit)
# 2. If CPU is high, check how many regex parsers you have
grep -c 'regex' /etc/fluent-bit/*.conf
# 3. Measure throughput with and without parsing
# Temporarily switch to raw forwarding and compare event rates
Fix: Move to structured logging (JSON at the source). If you must parse, use specific string splitting instead of regex where possible. Pre-compile regex patterns (most agents do this, but check plugin behavior).
Pattern: Output Buffering for Elasticsearch¶
Elasticsearch is the most common destination and the most common bottleneck:
# Fluentd: tuned ES output with file buffering
<match app.**>
@type elasticsearch
host elasticsearch
port 9200
index_name app-logs
type_name _doc
<buffer tag, time>
@type file
path /var/log/fluentd/buffer/es
timekey 60 # flush every 60 seconds
timekey_wait 10 # wait 10s after timekey expires
chunk_limit_size 8MB # max size per chunk
total_limit_size 4GB # max total buffer on disk
flush_thread_count 4 # parallel flush threads
retry_type exponential_backoff
retry_max_interval 60s
retry_forever true # keep retrying (do not drop)
overflow_action block # stop accepting if buffer full
</buffer>
</match>
Key tuning knobs:¶
chunk_limit_size → larger = fewer HTTP requests, more memory
flush_thread_count → more threads = higher throughput to ES
total_limit_size → how much disk you can use for buffering
retry_forever → true for critical logs, false for debug
overflow_action → 'block' (safe) vs 'drop_oldest_chunk' (lossy)
Gotcha: Single-Threaded Bottlenecks¶
Fluentd's Ruby runtime is single-threaded for most operations. If your aggregator processes 50,000 events/sec, you will hit the ceiling.
# Check if Fluentd is CPU-bound on one core
top -p $(pidof fluentd) -H
# If one thread is at 100%, you are bottlenecked
# Solutions:
# 1. Use Fluentd's multi-worker mode
<system>
workers 4
</system>
# 2. Switch to Vector for the aggregation layer (Rust, multi-threaded natively)
# 3. Run multiple Fluentd instances behind a load balancer
# Fluentbit → haproxy → Fluentd-1, Fluentd-2, Fluentd-3
Pattern: Testing Pipeline Changes¶
Never deploy log pipeline changes directly to production. Use this pattern:
# 1. Test config syntax
fluent-bit -c /etc/fluent-bit/fluent-bit.conf --dry-run
fluentd --dry-run -c /etc/fluentd/fluent.conf
vector validate /etc/vector/vector.toml
# 2. Test with a sample file
echo '{"level":"error","msg":"test"}' > /tmp/test.log
fluent-bit -i tail -p path=/tmp/test.log -o stdout
# 3. Test with a debug output (add temporarily)
# Fluentbit:
# [OUTPUT]
# Name stdout
# Match *
# Vector:
# [sinks.debug]
# type = "console"
# inputs = ["parse_json"]
# encoding.codec = "json"
# 4. Check for dropped events
# Compare input rate vs output rate in metrics
curl -s http://localhost:2020/api/v1/metrics | grep -E 'input|output'
Gotcha: Log Rotation and the Tail Input¶
When log files are rotated (renamed/compressed), the tail input must detect the change:
# Fluentbit: handle log rotation
[INPUT]
Name tail
Path /var/log/app/*.log
DB /var/log/flb_pos.db
Rotate_Wait 30 # wait 30s before releasing rotated file
Refresh_Interval 10 # check for new files every 10s
Read_from_Head false # new files: start from end (not beginning)
If Rotate_Wait is too short, you lose the tail end of the rotated file. If Refresh_Interval is too long, new log files are picked up late.
Pattern: Routing by Log Level¶
Under the hood: Log routing splits the stream after parsing but before buffering. This means the routing decision is CPU-cheap (a field comparison), but each output has its own buffer. If you route to 5 destinations, you need 5x the buffer memory. Monitor per-output buffer usage, not just the aggregate.
Send errors to fast/expensive storage, debug to cheap/slow storage:
# Fluentbit: route by level
[FILTER]
Name rewrite_tag
Match app.*
Rule $level ^(error|fatal)$ app.errors false
[OUTPUT]
Name es
Match app.errors
Host es-hot-tier
Index app-errors
[OUTPUT]
Name s3
Match app.*
region us-east-1
bucket log-archive
total_file_size 100M
# Vector: route by level
[transforms.route_by_level]
type = "route"
inputs = ["parse_json"]
route.errors = '.level == "error" || .level == "fatal"'
route.info = '.level == "info"'
route._unmatched = true
[sinks.error_es]
type = "elasticsearch"
inputs = ["route_by_level.errors"]
endpoints = ["http://es-hot:9200"]
[sinks.info_s3]
type = "aws_s3"
inputs = ["route_by_level.info"]
bucket = "log-archive"