Skip to content

Portal | Level: L2 | Domain: DevOps

Message Queues - Street-Level Ops

Day-to-day commands and patterns for engineers managing Kafka and RabbitMQ in production.


Quick Diagnosis Commands

Kafka: Consumer Group Lag

# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list

# Describe lag for a specific group (shows per-partition breakdown)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-consumer-group

# Watch lag in a loop (update every 10s)
watch -n 10 "kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-consumer-group 2>/dev/null | column -t"

# Describe lag for ALL groups (broad sweep during incident)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --all-groups 2>/dev/null | awk '$6 > 0'

Kafka: Topic Inspection

# List topics
kafka-topics.sh --bootstrap-server kafka:9092 --list

# Describe a topic (partition count, replicas, ISR)
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic orders

# Peek at messages from the beginning (first 10)
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic orders --from-beginning --max-messages 10

# Peek at latest messages
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic orders --offset latest --partition 0

# Read from a specific offset on partition 2
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic orders --partition 2 --offset 5000 --max-messages 5

# Show message with key and timestamp
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic orders --from-beginning --max-messages 5 \
  --property print.key=true --property print.timestamp=true

# Check log-end offset per partition (quick lag baseline)
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 --topic orders --time -1

Kafka: Broker and Topic Health

# Check under-replicated partitions (should be 0 in healthy cluster)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --under-replicated-partitions

# Check partitions with no leader (serious: consumers/producers stalled)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --unavailable-partitions

# Describe broker configs
kafka-configs.sh --bootstrap-server kafka:9092 \
  --describe --entity-type brokers --entity-name 0

# Get topic-level config (retention, compaction policy, etc.)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --describe --entity-type topics --entity-name orders

RabbitMQ: Queue and Broker Status

# List all queues with message count and consumer count
rabbitmqctl list_queues name messages consumers memory state

# Show only queues with messages (backlog check)
rabbitmqctl list_queues name messages | awk '$2 > 0'

# Detailed queue info (ready, unacked, total, throughput)
rabbitmqctl list_queues name messages_ready messages_unacknowledged \
  messages message_stats.publish_details.rate \
  message_stats.deliver_details.rate

# List exchanges
rabbitmqctl list_exchanges name type durable auto_delete

# List bindings
rabbitmqctl list_bindings

# List consumers with their queue and prefetch settings
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count ack_required

# Check connections and channels
rabbitmqctl list_connections name state channels send_pend recv_cnt

# Node status (memory, disk alarms, file descriptors)
rabbitmqctl status

# Check for memory/disk alarms (should be empty in healthy cluster)
rabbitmqctl list_connections state | grep -E "blocked|blocking"

RabbitMQ: Management API (when rabbitmqctl is unavailable)

# Queue overview (requires management plugin)
curl -s -u guest:guest http://localhost:15672/api/queues | \
  python3 -m json.tool | grep -E '"name"|"messages"'

# Specific queue details
curl -s -u guest:guest \
  http://localhost:15672/api/queues/%2F/orders | python3 -m json.tool

# Node overview
curl -s -u guest:guest http://localhost:15672/api/nodes | python3 -m json.tool

Gotchas

Consumer Lag Spiral

What happens: Consumer falls behind. Lag grows. Reprocessing old messages causes more errors. More errors → more retries → more load → consumer falls further behind. Classic death spiral.

Symptoms: - Consumer lag growing monotonically - Consumer CPU at 100% or I/O saturated - Error rate rising alongside lag

Diagnosis:

# Is lag growing or stable?
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-group | awk '{print $6}' | sort -n | tail

# How fast is production rate?
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 --topic orders --time -1

# Compare against 60s ago (run twice, diff)

Recovery: 1. Scale up consumers (add instances, up to partition count). 2. If processing is slow: identify and fix the bottleneck (DB query, external API call). 3. Temporarily increase consumer parallelism per instance. 4. If message volume is permanently higher: increase partition count and rebalance. 5. Do NOT reset offsets without understanding what messages you will skip or reprocess.


Rebalance Storms

What happens: Consumer joins or leaves group → rebalance → all consumers stop for 10-30s → lag spike → another consumer times out → another rebalance. Cycle repeats.

Symptoms: - Log messages: Rebalancing... Assigned partitions: [] repeatedly - Consumer lag spikes every few minutes - Kafka coordinator logs show frequent group changes

Diagnosis:

# Watch for rebalance events in consumer logs
kubectl logs -f deploy/my-consumer | grep -i "rebalance\|partition\|revoked\|assigned"

# Check session timeout vs poll interval configuration
kafka-configs.sh --bootstrap-server kafka:9092 \
  --describe --entity-type brokers | grep session

Fix:

# Consumer config adjustments
session.timeout.ms=45000            # default 45s; increase if consumers are slow
heartbeat.interval.ms=15000         # must be < session.timeout.ms / 3
max.poll.interval.ms=600000         # max time between polls; increase for slow processing
max.poll.records=100                # reduce records per poll to stay within poll interval

# Use static group membership to avoid rebalance on restart
group.instance.id=consumer-pod-1   # stable identity per consumer instance

# Upgrade to incremental cooperative rebalancing (Kafka 2.4+)
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor


Unacked Message Buildup (RabbitMQ)

What happens: Consumers receive messages (broker marks them in-flight/unacked) but never ack or nack. Queue appears empty but messages are held hostage. Consumer restart dumps them back to ready state — but only after channel/connection closes.

Symptoms: - messages_ready is low but messages_unacknowledged is high - Queue seems stuck even though consumers are running - Restarting consumers suddenly "releases" a flood of messages

Diagnosis:

# Show unacked vs ready split
rabbitmqctl list_queues name messages_ready messages_unacknowledged

# Find which consumers are holding unacked messages
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count ack_required

# Check for consumers with high unacked and no ack activity
rabbitmqctl list_consumers queue_name messages_unacknowledged

Fix: - Set prefetch_count to bound how many unacked messages per consumer:

channel.basic_qos(prefetch_count=10)
- Ensure consumers ack on success, nack (with requeue=False) on unrecoverable error. - Add consumer heartbeat so broker detects stalled consumers faster. - Check for exception paths that skip the ack/nack call.


Partition Skew

What happens: One or more Kafka partitions receive disproportionately more messages than others because of a poorly chosen partition key or a hot key (e.g., all messages for one high-volume customer keyed on customer ID).

Symptoms: - One consumer instance at 100% CPU/IO, others idle - Per-partition lag: one partition has millions of messages, others have zero

Diagnosis:

# Check per-partition lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-group | sort -k6 -rn | head -20

# Check per-partition message count
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 --topic orders --time -1 | sort -t: -k3 -rn

Fix: - Choose a higher-cardinality partition key (e.g., order_id instead of region). - Add a random suffix to hot keys for skewed topics (accepts ordering loss for those keys). - If key cardinality is fundamentally low, use round-robin partitioning (null key) and accept loss of ordering. - Increase partition count to spread load (requires consumer group rebalance).


Patterns

DLQ Triage

When a DLQ accumulates messages, systematic triage beats ad-hoc inspection:

# RabbitMQ: inspect DLQ without consuming
rabbitmqadmin get queue=orders.dlq count=10 requeue=true

# Peek at DLQ message headers (shows original routing key, death count, reason)
rabbitmqctl list_queues name messages | grep dlq

# Kafka DLQ: inspect dead-letter topic
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic orders.dlq --from-beginning --max-messages 20 \
  --property print.headers=true \
  --property print.timestamp=true

# Count DLQ depth over time (is it growing?)
watch -n 30 "kafka-run-class.sh kafka.tools.GetOffsetShell \
  --bootstrap-server kafka:9092 --topic orders.dlq --time -1"

Triage workflow: 1. Sample 10-20 messages. Are they all the same error class? 2. If yes: fix the bug, then replay. If no: categorize before replaying. 3. Replay fixed messages back to original topic:

# Kafka: re-publish DLQ messages to original topic
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
  --topic orders.dlq --from-beginning | \
kafka-console-producer.sh --bootstrap-server kafka:9092 \
  --topic orders
4. Monitor original consumer group lag during replay to ensure no new failures.


Idempotent Consumers

One-liner: At-least-once delivery guarantees you will see every message, but never guarantees you will see it only once. Idempotency is not optional -- it is the contract your consumer owes the system.

Every at-least-once consumer must handle duplicate delivery. The pattern:

def process_message(msg):
    message_id = msg.headers.get("message-id") or msg.key

    # Check dedup store (Redis, DB, or idempotency table)
    if already_processed(message_id):
        logger.info("Skipping duplicate: %s", message_id)
        return

    with transaction():
        # Do the work
        apply_order(msg.payload)
        # Record processing in same transaction
        mark_processed(message_id)

Deduplication store options: - Redis SET NX EX 3600: fast, non-durable. Good for idempotency within a time window. - DB unique constraint on message_id: durable, transactional. Best for financial events. - Conditional update (optimistic locking): update only if current state matches expected pre-state.

Rule: idempotency key scope = the consumer group + message ID. The same message processed by two different consumer groups is not a duplicate.


Backpressure Handling

When your consumer is overloaded, controlled slowdown prevents cascading failures:

# Pattern: pull-with-pause (Kafka)
while True:
    records = consumer.poll(timeout_ms=1000)

    if is_downstream_overloaded():
        consumer.pause(consumer.assignment())  # stop fetching new records
        time.sleep(5)
        consumer.resume(consumer.assignment())
        continue

    for record in records:
        process(record)
    consumer.commit()
# Pattern: bounded work queue with admission control (RabbitMQ)
channel.basic_qos(prefetch_count=WORKER_POOL_SIZE)
# The broker will not push more than WORKER_POOL_SIZE unacked messages.
# When the thread pool is full, workers stop acking → broker stops sending.

Key insight: backpressure only works if there is a feedback path from consumer to broker. RabbitMQ prefetch + thread pool creates this naturally. Kafka requires explicit pause/resume or careful max.poll.records tuning.


Consumer Scaling

Kafka: scale by adding consumer instances to the group. Kafka automatically rebalances. Hard limit = partition count. To scale beyond current limits:

# Increase partition count (irreversible — ordering guarantees per key may break)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --alter --topic orders --partitions 24

# Verify rebalance completed
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group my-group | wc -l

RabbitMQ: add consumer instances. No hard limit analogous to partition count. Each consumer competes for messages from the same queue. Increase prefetch proportionally when adding consumers.

# Check consumer distribution across queue
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count | \
  awk '{sum += $3; count++} END {print "avg prefetch:", sum/count, "consumers:", count}'

Scaling heuristics: - If consumer CPU is the bottleneck: add instances. - If consumer is I/O-bound (DB calls, network): add instances OR optimize the I/O. - If message processing is inherently serial (ordering required): partition more finely, keep one consumer per partition for ordered processing.



Primer: - Message Queues Primer

Footguns: - Message Queues Footguns

Flashcard Decks: - message-queues (30 cards)