Skip to content

Log Pipelines - Street-Level Ops

Quick Diagnosis Commands

When logs stop showing up in your search tool:

# 1. Is the log agent running?
systemctl status fluent-bit
systemctl status fluentd
systemctl status vector

# 2. Check agent logs for errors
journalctl -u fluent-bit --since "10 minutes ago"
journalctl -u vector -n 100

# 3. Are log files being written to?
ls -la /var/log/app/
tail -1 /var/log/app/app.log

# 4. Is the destination reachable?
curl -s http://elasticsearch:9200/_cluster/health | python3 -m json.tool
curl -s http://elasticsearch:9200/_cat/indices?v | head -20

# 5. Check pipeline buffer status (Fluentbit)
curl -s http://localhost:2020/api/v1/metrics | python3 -m json.tool

# 6. Check disk space (full disk = lost logs)
df -h /var/log /var/lib/fluentd /tmp

# 7. Check file descriptor usage (too many open log files)
ls /proc/$(pidof fluent-bit)/fd | wc -l

Pattern: Fluentbit Configuration for Kubernetes

The standard pattern: Fluentbit as a DaemonSet reading container logs.

# fluent-bit.conf
[SERVICE]
    Flush         5
    Log_Level     info
    Daemon        off
    Parsers_File  parsers.conf
    HTTP_Server   On
    HTTP_Listen   0.0.0.0
    HTTP_Port     2020

[INPUT]
    Name              tail
    Tag               kube.*
    Path              /var/log/containers/*.log
    Parser            cri
    DB                /var/log/flb_kube.db
    Mem_Buf_Limit     15MB
    Skip_Long_Lines   On
    Refresh_Interval  10

[FILTER]
    Name                kubernetes
    Match               kube.*
    Kube_URL            https://kubernetes.default.svc:443
    Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
    Merge_Log           On
    K8S-Logging.Parser  On
    K8S-Logging.Exclude On

[OUTPUT]
    Name            es
    Match           kube.*
    Host            ${ELASTICSEARCH_HOST}
    Port            9200
    Index           k8s-logs
    Logstash_Format On
    Retry_Limit     5

Key settings explained:

DB /var/log/flb_kube.db     Track file positions (survive restarts)
Mem_Buf_Limit 15MB          Per-input memory cap (backpressure trigger)
Skip_Long_Lines On          Do not choke on lines > Buffer_Max_Size
Merge_Log On                Parse JSON from container log message field
Retry_Limit 5               Retry failed sends 5 times before dropping

Pattern: Debugging Blocked Pipelines

When the buffer is full and logs are being dropped:

# Check Fluentbit metrics endpoint
curl -s http://localhost:2020/api/v1/metrics/prometheus | grep -E 'dropped|retry|error'

# Check Fluentd buffer status
# Look for "buffer is full" in logs
journalctl -u fluentd | grep -i "buffer\|overflow\|retry"

# Check Vector pipeline health
curl -s http://localhost:8686/health
# Vector metrics (if Prometheus sink configured)
curl -s http://localhost:9598/metrics | grep -E 'events_in|events_out|buffer'

# Common causes of blocked pipelines:
# 1. Destination is down or slow
curl -s -o /dev/null -w "%{http_code} %{time_total}s" http://elasticsearch:9200/
# 2. Disk buffer is full
df -h /var/lib/fluentd/buffer/
# 3. DNS resolution failure (cannot reach destination)
dig elasticsearch.internal
# 4. TLS certificate expired
openssl s_client -connect elasticsearch:9200 2>/dev/null | openssl x509 -noout -dates

Gotcha: Losing Logs on Agent Restart

If the agent does not track file offsets, restarting it causes either log duplication (re-reads from beginning) or loss (starts from end):

# Fluentbit: ALWAYS configure position tracking
[INPUT]
    Name    tail
    Path    /var/log/app/*.log
    DB      /var/log/flb_positions.db    # THIS IS CRITICAL
    # Without DB, Fluentbit starts from the end of the file on restart
    # and you lose everything written while it was down

# Fluentd: position file
<source>
  @type tail
  path /var/log/app/*.log
  pos_file /var/log/fluentd/pos/app.pos   # THIS IS CRITICAL
  <parse>
    @type json
  </parse>
</source>

# Vector: checkpoint path
[sources.app_logs]
type = "file"
include = ["/var/log/app/*.log"]
data_dir = "/var/lib/vector"    # stores file checkpoints here

Pattern: Multiline Log Parsing

# Fluentbit: Java stack trace multiline
[MULTILINE_PARSER]
    name          java_multiline
    type          regex
    flush_timeout 2000
    rule          "start_state"  "/^\d{4}-\d{2}-\d{2}/"  "cont"
    rule          "cont"         "/^\s+/"                  "cont"

[INPUT]
    Name               tail
    Path               /var/log/app/app.log
    Tag                app.java
    multiline.parser   java_multiline
    DB                 /var/log/flb_app.db
# Vector: multiline with condition
[sources.app_logs]
type = "file"
include = ["/var/log/app/app.log"]
multiline.mode = "halt_before"
multiline.start_pattern = '^\d{4}-\d{2}-\d{2}'
multiline.condition_pattern = '^\s+'
multiline.timeout_ms = 2000

Test multiline parsing by tailing the agent's output, not the destination. This isolates parsing issues from network/destination issues.


Gotcha: Regex Parsing Performance

Regex-heavy parsing configs are the silent killer of log pipeline throughput:

# Benchmark: parsing 10,000 events/sec
# JSON parse:  ~50,000 events/sec  (fast)
# Regex parse: ~5,000 events/sec   (10x slower)
# Grok parse:  ~3,000 events/sec   (even worse)
# Check if parsing is the bottleneck:
# 1. Monitor CPU usage of the log agent
top -p $(pidof fluent-bit)

# 2. If CPU is high, check how many regex parsers you have
grep -c 'regex' /etc/fluent-bit/*.conf

# 3. Measure throughput with and without parsing
# Temporarily switch to raw forwarding and compare event rates

Fix: Move to structured logging (JSON at the source). If you must parse, use specific string splitting instead of regex where possible. Pre-compile regex patterns (most agents do this, but check plugin behavior).


Pattern: Output Buffering for Elasticsearch

Elasticsearch is the most common destination and the most common bottleneck:

# Fluentd: tuned ES output with file buffering
<match app.**>
  @type elasticsearch
  host elasticsearch
  port 9200
  index_name app-logs
  type_name _doc

  <buffer tag, time>
    @type file
    path /var/log/fluentd/buffer/es
    timekey 60                    # flush every 60 seconds
    timekey_wait 10               # wait 10s after timekey expires
    chunk_limit_size 8MB          # max size per chunk
    total_limit_size 4GB          # max total buffer on disk
    flush_thread_count 4          # parallel flush threads
    retry_type exponential_backoff
    retry_max_interval 60s
    retry_forever true            # keep retrying (do not drop)
    overflow_action block         # stop accepting if buffer full
  </buffer>
</match>

Key tuning knobs:

chunk_limit_size    → larger = fewer HTTP requests, more memory
flush_thread_count  → more threads = higher throughput to ES
total_limit_size    → how much disk you can use for buffering
retry_forever       → true for critical logs, false for debug
overflow_action     → 'block' (safe) vs 'drop_oldest_chunk' (lossy)

Gotcha: Single-Threaded Bottlenecks

Fluentd's Ruby runtime is single-threaded for most operations. If your aggregator processes 50,000 events/sec, you will hit the ceiling.

# Check if Fluentd is CPU-bound on one core
top -p $(pidof fluentd) -H
# If one thread is at 100%, you are bottlenecked

# Solutions:
# 1. Use Fluentd's multi-worker mode
<system>
  workers 4
</system>

# 2. Switch to Vector for the aggregation layer (Rust, multi-threaded natively)

# 3. Run multiple Fluentd instances behind a load balancer
# Fluentbit → haproxy → Fluentd-1, Fluentd-2, Fluentd-3

Pattern: Testing Pipeline Changes

Never deploy log pipeline changes directly to production. Use this pattern:

# 1. Test config syntax
fluent-bit -c /etc/fluent-bit/fluent-bit.conf --dry-run
fluentd --dry-run -c /etc/fluentd/fluent.conf
vector validate /etc/vector/vector.toml

# 2. Test with a sample file
echo '{"level":"error","msg":"test"}' > /tmp/test.log
fluent-bit -i tail -p path=/tmp/test.log -o stdout

# 3. Test with a debug output (add temporarily)
# Fluentbit:
# [OUTPUT]
#     Name   stdout
#     Match  *
# Vector:
# [sinks.debug]
# type = "console"
# inputs = ["parse_json"]
# encoding.codec = "json"

# 4. Check for dropped events
# Compare input rate vs output rate in metrics
curl -s http://localhost:2020/api/v1/metrics | grep -E 'input|output'

Gotcha: Log Rotation and the Tail Input

When log files are rotated (renamed/compressed), the tail input must detect the change:

# Fluentbit: handle log rotation
[INPUT]
    Name             tail
    Path             /var/log/app/*.log
    DB               /var/log/flb_pos.db
    Rotate_Wait      30            # wait 30s before releasing rotated file
    Refresh_Interval 10            # check for new files every 10s
    Read_from_Head   false         # new files: start from end (not beginning)

If Rotate_Wait is too short, you lose the tail end of the rotated file. If Refresh_Interval is too long, new log files are picked up late.


Pattern: Routing by Log Level

Under the hood: Log routing splits the stream after parsing but before buffering. This means the routing decision is CPU-cheap (a field comparison), but each output has its own buffer. If you route to 5 destinations, you need 5x the buffer memory. Monitor per-output buffer usage, not just the aggregate.

Send errors to fast/expensive storage, debug to cheap/slow storage:

# Fluentbit: route by level
[FILTER]
    Name    rewrite_tag
    Match   app.*
    Rule    $level ^(error|fatal)$ app.errors false

[OUTPUT]
    Name    es
    Match   app.errors
    Host    es-hot-tier
    Index   app-errors

[OUTPUT]
    Name    s3
    Match   app.*
    region  us-east-1
    bucket  log-archive
    total_file_size 100M
# Vector: route by level
[transforms.route_by_level]
type = "route"
inputs = ["parse_json"]
route.errors = '.level == "error" || .level == "fatal"'
route.info = '.level == "info"'
route._unmatched = true

[sinks.error_es]
type = "elasticsearch"
inputs = ["route_by_level.errors"]
endpoints = ["http://es-hot:9200"]

[sinks.info_s3]
type = "aws_s3"
inputs = ["route_by_level.info"]
bucket = "log-archive"