Portal | Level: L2 | Domain: DevOps
Message Queues - Street-Level Ops¶
Day-to-day commands and patterns for engineers managing Kafka and RabbitMQ in production.
Quick Diagnosis Commands¶
Kafka: Consumer Group Lag¶
# List all consumer groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# Describe lag for a specific group (shows per-partition breakdown)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-consumer-group
# Watch lag in a loop (update every 10s)
watch -n 10 "kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-consumer-group 2>/dev/null | column -t"
# Describe lag for ALL groups (broad sweep during incident)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --all-groups 2>/dev/null | awk '$6 > 0'
Kafka: Topic Inspection¶
# List topics
kafka-topics.sh --bootstrap-server kafka:9092 --list
# Describe a topic (partition count, replicas, ISR)
kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic orders
# Peek at messages from the beginning (first 10)
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic orders --from-beginning --max-messages 10
# Peek at latest messages
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic orders --offset latest --partition 0
# Read from a specific offset on partition 2
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic orders --partition 2 --offset 5000 --max-messages 5
# Show message with key and timestamp
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic orders --from-beginning --max-messages 5 \
--property print.key=true --property print.timestamp=true
# Check log-end offset per partition (quick lag baseline)
kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 --topic orders --time -1
Kafka: Broker and Topic Health¶
# Check under-replicated partitions (should be 0 in healthy cluster)
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --under-replicated-partitions
# Check partitions with no leader (serious: consumers/producers stalled)
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --unavailable-partitions
# Describe broker configs
kafka-configs.sh --bootstrap-server kafka:9092 \
--describe --entity-type brokers --entity-name 0
# Get topic-level config (retention, compaction policy, etc.)
kafka-configs.sh --bootstrap-server kafka:9092 \
--describe --entity-type topics --entity-name orders
RabbitMQ: Queue and Broker Status¶
# List all queues with message count and consumer count
rabbitmqctl list_queues name messages consumers memory state
# Show only queues with messages (backlog check)
rabbitmqctl list_queues name messages | awk '$2 > 0'
# Detailed queue info (ready, unacked, total, throughput)
rabbitmqctl list_queues name messages_ready messages_unacknowledged \
messages message_stats.publish_details.rate \
message_stats.deliver_details.rate
# List exchanges
rabbitmqctl list_exchanges name type durable auto_delete
# List bindings
rabbitmqctl list_bindings
# List consumers with their queue and prefetch settings
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count ack_required
# Check connections and channels
rabbitmqctl list_connections name state channels send_pend recv_cnt
# Node status (memory, disk alarms, file descriptors)
rabbitmqctl status
# Check for memory/disk alarms (should be empty in healthy cluster)
rabbitmqctl list_connections state | grep -E "blocked|blocking"
RabbitMQ: Management API (when rabbitmqctl is unavailable)¶
# Queue overview (requires management plugin)
curl -s -u guest:guest http://localhost:15672/api/queues | \
python3 -m json.tool | grep -E '"name"|"messages"'
# Specific queue details
curl -s -u guest:guest \
http://localhost:15672/api/queues/%2F/orders | python3 -m json.tool
# Node overview
curl -s -u guest:guest http://localhost:15672/api/nodes | python3 -m json.tool
Gotchas¶
Consumer Lag Spiral¶
What happens: Consumer falls behind. Lag grows. Reprocessing old messages causes more errors. More errors → more retries → more load → consumer falls further behind. Classic death spiral.
Symptoms: - Consumer lag growing monotonically - Consumer CPU at 100% or I/O saturated - Error rate rising alongside lag
Diagnosis:
# Is lag growing or stable?
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group | awk '{print $6}' | sort -n | tail
# How fast is production rate?
kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 --topic orders --time -1
# Compare against 60s ago (run twice, diff)
Recovery: 1. Scale up consumers (add instances, up to partition count). 2. If processing is slow: identify and fix the bottleneck (DB query, external API call). 3. Temporarily increase consumer parallelism per instance. 4. If message volume is permanently higher: increase partition count and rebalance. 5. Do NOT reset offsets without understanding what messages you will skip or reprocess.
Rebalance Storms¶
What happens: Consumer joins or leaves group → rebalance → all consumers stop for 10-30s → lag spike → another consumer times out → another rebalance. Cycle repeats.
Symptoms:
- Log messages: Rebalancing... Assigned partitions: [] repeatedly
- Consumer lag spikes every few minutes
- Kafka coordinator logs show frequent group changes
Diagnosis:
# Watch for rebalance events in consumer logs
kubectl logs -f deploy/my-consumer | grep -i "rebalance\|partition\|revoked\|assigned"
# Check session timeout vs poll interval configuration
kafka-configs.sh --bootstrap-server kafka:9092 \
--describe --entity-type brokers | grep session
Fix:
# Consumer config adjustments
session.timeout.ms=45000 # default 45s; increase if consumers are slow
heartbeat.interval.ms=15000 # must be < session.timeout.ms / 3
max.poll.interval.ms=600000 # max time between polls; increase for slow processing
max.poll.records=100 # reduce records per poll to stay within poll interval
# Use static group membership to avoid rebalance on restart
group.instance.id=consumer-pod-1 # stable identity per consumer instance
# Upgrade to incremental cooperative rebalancing (Kafka 2.4+)
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Unacked Message Buildup (RabbitMQ)¶
What happens: Consumers receive messages (broker marks them in-flight/unacked) but never ack or nack. Queue appears empty but messages are held hostage. Consumer restart dumps them back to ready state — but only after channel/connection closes.
Symptoms:
- messages_ready is low but messages_unacknowledged is high
- Queue seems stuck even though consumers are running
- Restarting consumers suddenly "releases" a flood of messages
Diagnosis:
# Show unacked vs ready split
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# Find which consumers are holding unacked messages
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count ack_required
# Check for consumers with high unacked and no ack activity
rabbitmqctl list_consumers queue_name messages_unacknowledged
Fix:
- Set prefetch_count to bound how many unacked messages per consumer:
requeue=False) on unrecoverable error.
- Add consumer heartbeat so broker detects stalled consumers faster.
- Check for exception paths that skip the ack/nack call.
Partition Skew¶
What happens: One or more Kafka partitions receive disproportionately more messages than others because of a poorly chosen partition key or a hot key (e.g., all messages for one high-volume customer keyed on customer ID).
Symptoms: - One consumer instance at 100% CPU/IO, others idle - Per-partition lag: one partition has millions of messages, others have zero
Diagnosis:
# Check per-partition lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group | sort -k6 -rn | head -20
# Check per-partition message count
kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 --topic orders --time -1 | sort -t: -k3 -rn
Fix:
- Choose a higher-cardinality partition key (e.g., order_id instead of region).
- Add a random suffix to hot keys for skewed topics (accepts ordering loss for those keys).
- If key cardinality is fundamentally low, use round-robin partitioning (null key) and accept loss of ordering.
- Increase partition count to spread load (requires consumer group rebalance).
Patterns¶
DLQ Triage¶
When a DLQ accumulates messages, systematic triage beats ad-hoc inspection:
# RabbitMQ: inspect DLQ without consuming
rabbitmqadmin get queue=orders.dlq count=10 requeue=true
# Peek at DLQ message headers (shows original routing key, death count, reason)
rabbitmqctl list_queues name messages | grep dlq
# Kafka DLQ: inspect dead-letter topic
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic orders.dlq --from-beginning --max-messages 20 \
--property print.headers=true \
--property print.timestamp=true
# Count DLQ depth over time (is it growing?)
watch -n 30 "kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 --topic orders.dlq --time -1"
Triage workflow: 1. Sample 10-20 messages. Are they all the same error class? 2. If yes: fix the bug, then replay. If no: categorize before replaying. 3. Replay fixed messages back to original topic:
# Kafka: re-publish DLQ messages to original topic
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic orders.dlq --from-beginning | \
kafka-console-producer.sh --bootstrap-server kafka:9092 \
--topic orders
Idempotent Consumers¶
One-liner: At-least-once delivery guarantees you will see every message, but never guarantees you will see it only once. Idempotency is not optional -- it is the contract your consumer owes the system.
Every at-least-once consumer must handle duplicate delivery. The pattern:
def process_message(msg):
message_id = msg.headers.get("message-id") or msg.key
# Check dedup store (Redis, DB, or idempotency table)
if already_processed(message_id):
logger.info("Skipping duplicate: %s", message_id)
return
with transaction():
# Do the work
apply_order(msg.payload)
# Record processing in same transaction
mark_processed(message_id)
Deduplication store options:
- Redis SET NX EX 3600: fast, non-durable. Good for idempotency within a time window.
- DB unique constraint on message_id: durable, transactional. Best for financial events.
- Conditional update (optimistic locking): update only if current state matches expected pre-state.
Rule: idempotency key scope = the consumer group + message ID. The same message processed by two different consumer groups is not a duplicate.
Backpressure Handling¶
When your consumer is overloaded, controlled slowdown prevents cascading failures:
# Pattern: pull-with-pause (Kafka)
while True:
records = consumer.poll(timeout_ms=1000)
if is_downstream_overloaded():
consumer.pause(consumer.assignment()) # stop fetching new records
time.sleep(5)
consumer.resume(consumer.assignment())
continue
for record in records:
process(record)
consumer.commit()
# Pattern: bounded work queue with admission control (RabbitMQ)
channel.basic_qos(prefetch_count=WORKER_POOL_SIZE)
# The broker will not push more than WORKER_POOL_SIZE unacked messages.
# When the thread pool is full, workers stop acking → broker stops sending.
Key insight: backpressure only works if there is a feedback path from consumer to broker. RabbitMQ prefetch + thread pool creates this naturally. Kafka requires explicit pause/resume or careful max.poll.records tuning.
Consumer Scaling¶
Kafka: scale by adding consumer instances to the group. Kafka automatically rebalances. Hard limit = partition count. To scale beyond current limits:
# Increase partition count (irreversible — ordering guarantees per key may break)
kafka-topics.sh --bootstrap-server kafka:9092 \
--alter --topic orders --partitions 24
# Verify rebalance completed
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group | wc -l
RabbitMQ: add consumer instances. No hard limit analogous to partition count. Each consumer competes for messages from the same queue. Increase prefetch proportionally when adding consumers.
# Check consumer distribution across queue
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count | \
awk '{sum += $3; count++} END {print "avg prefetch:", sum/count, "consumers:", count}'
Scaling heuristics: - If consumer CPU is the bottleneck: add instances. - If consumer is I/O-bound (DB calls, network): add instances OR optimize the I/O. - If message processing is inherently serial (ordering required): partition more finely, keep one consumer per partition for ordered processing.
Related Resources¶
Primer: - Message Queues Primer
Footguns: - Message Queues Footguns
Flashcard Decks: - message-queues (30 cards)