Skip to content

RabbitMQ Operations - Street-Level Ops

Quick Diagnosis Commands

# Check node status
rabbitmqctl status

# List all queues with message counts and consumer counts
rabbitmqctl list_queues name messages consumers messages_ready messages_unacknowledged

# List queues in a specific vhost
rabbitmqctl list_queues -p /myvhost name messages consumers

# List all connections
rabbitmqctl list_connections name peer_host peer_port state

# List all channels
rabbitmqctl list_channels name number messages_unacknowledged

# List exchanges
rabbitmqctl list_exchanges name type durable auto_delete

# List bindings
rabbitmqctl list_bindings source_name source_kind destination_name destination_kind routing_key

# List consumers
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count
# Check memory and disk alarms
rabbitmqctl status | grep -A5 "alarms\|memory\|disk"

# Check node health
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_port_connectivity
rabbitmq-diagnostics memory_breakdown

# Show cluster status
rabbitmqctl cluster_status

# Show which node is the disk node (classic mirroring)
rabbitmqctl cluster_status | grep -A2 "disc\|ram"

# Check queue health for a specific queue
rabbitmqctl list_queues -p / name state messages consumers memory policy | grep "my-queue"
# Management plugin HTTP API (when management plugin is enabled)
# Default management UI: http://localhost:15672 (admin/admin)

# Queue details via API
curl -s -u admin:admin \
  http://localhost:15672/api/queues/%2F/my-queue | jq '{
    messages: .messages,
    consumers: .consumers,
    state: .state,
    memory: .memory,
    policy: .effective_policy_definition
  }'

# List all queues as JSON
curl -s -u admin:admin http://localhost:15672/api/queues | \
  jq '.[] | {name: .name, messages: .messages, consumers: .consumers}'

# Get node overview
curl -s -u admin:admin http://localhost:15672/api/overview | \
  jq '{memory: .object_totals, rates: .message_stats}'

Common Scenarios

Scenario 1: Queue Depth Growing — Consumers Not Keeping Up

Queue depth is climbing and messages_ready is large. Consumers are running but can't keep up.

# Check consumer count and prefetch
rabbitmqctl list_queues name messages consumers
rabbitmqctl list_consumers queue_name consumer_tag prefetch_count ack_required

# Check if consumers are busy (messages_unacknowledged high)
rabbitmqctl list_queues name messages_ready messages_unacknowledged consumers

# If prefetch_count is 0 (unlimited), consumers are getting all messages at once
# This prevents load distribution across multiple consumers
# Fix: set prefetch to a reasonable value (e.g., 10-50 per consumer)
# In your consumer code: channel.basic_qos(prefetch_count=10)

# Check message processing rates via management API
curl -s -u admin:admin \
  "http://localhost:15672/api/queues/%2F/my-queue" | \
  jq '.message_stats | {deliver_rate: .deliver_details.rate, ack_rate: .ack_details.rate}'

# Scale consumers if processing rate is the bottleneck
# (deploy more consumer instances)

Fix: Set prefetch_count to limit how many unacked messages each consumer holds. Scale consumer replicas. Consider queue sharding with consistent-hash exchange for very high throughput.

Scenario 2: Messages in Dead Letter Queue

DLQ is filling up. Need to understand what's failing.

# Check DLQ depth
rabbitmqctl list_queues name messages | grep "\.dlq\|dead"

# Inspect a dead-lettered message (via management API)
curl -s -u admin:admin \
  -X POST http://localhost:15672/api/queues/%2F/my-queue.dlq/get \
  -H 'Content-Type: application/json' \
  -d '{"count": 1, "ack_mode": "peek_message", "encoding": "auto"}' | \
  jq '.[0] | {
    routing_key: .routing_key,
    payload: .payload,
    death: .properties.headers."x-death"
  }'

# x-death header shows reason: rejected, expired, maxlen, delivery-limit
# Common reasons:
# - rejected: consumer called basic_reject or basic_nack with requeue=false
# - expired: message TTL expired before consumption
# - maxlen: queue reached max-length, old messages dropped to DLQ

# Re-queue messages from DLQ back to original queue (via shovel)
rabbitmqctl set_parameter shovel dlq-replay \
  '{"src-protocol": "amqp091", "src-uri": "amqp://", "src-queue": "my-queue.dlq",
    "dest-protocol": "amqp091", "dest-uri": "amqp://", "dest-queue": "my-queue"}'

# Remove shovel after replay is done
rabbitmqctl clear_parameter shovel dlq-replay

Scenario 3: Memory Alarm Triggered — Broker Blocking Publishers

Under the hood: When the memory alarm fires, RabbitMQ blocks all publisher connections at the TCP level -- it stops reading from their sockets. Consumers continue to drain queues. This is by design: back-pressure on producers while consumers catch up. But if your producers have short TCP timeouts, they will see connection errors rather than blocking.

RabbitMQ hits the memory high watermark and blocks all publisher connections.

# Check current memory alarm status
rabbitmqctl status | grep -A2 "memory_alarm\|disk_free_alarm"

# See memory breakdown
rabbitmq-diagnostics memory_breakdown

# Check watermark setting (default: 40% of RAM)
rabbitmqctl status | grep vm_memory_high_watermark

# Temporarily raise watermark (buys time, doesn't fix root cause)
rabbitmqctl set_vm_memory_high_watermark 0.6

# Check which queues are consuming most memory
rabbitmqctl list_queues name memory messages | sort -k2 -rn | head -10

# Force GC on the broker (may free memory temporarily)
rabbitmqctl eval 'erlang:garbage_collect().'

# Purge a queue that's grown too large (destructive — messages deleted)
rabbitmqctl purge_queue my-queue -p /

Fix: Increase consumer throughput to drain queues. Set max-length policy on queues to prevent unbounded growth. Add memory to the broker. Configure lazy queues to page messages to disk.

Scenario 4: Node Won't Rejoin Cluster After Restart

Gotcha: If all nodes in a cluster shut down, the LAST node to stop must be the FIRST to start. RabbitMQ refuses to start a node that was not the last to shut down because it may have stale data. If you do not know which stopped last, use rabbitmqctl force_boot on one node -- but this risks losing messages that only existed on other nodes.

After a maintenance restart, a RabbitMQ node fails to rejoin the cluster.

# Check the error
rabbitmqctl cluster_status
journalctl -u rabbitmq-server --since "1 hour ago" | tail -50

# Common cause: node thinks it was partitioned and needs reset
# Check for network partition
rabbitmqctl status | grep -A5 "partitions"

# If partitioned, decide which side wins, then reset the other
# On the node to reset (WARNING: this discards its data):
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

# Rejoin cluster after reset
rabbitmqctl join_cluster rabbit@other-node
rabbitmqctl start_app

# Verify cluster membership
rabbitmqctl cluster_status | grep -A5 "running_nodes"

# Re-apply HA/quorum policies after rejoin
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' --apply-to queues

Key Patterns

Exchange Types and Routing

# Direct exchange — routes by exact routing key match
rabbitmqctl declare exchange name=orders type=direct durable=true

# Fanout exchange — routes to all bound queues (ignores routing key)
rabbitmqctl declare exchange name=events type=fanout durable=true

# Topic exchange — routes by pattern matching (* = one word, # = zero or more)
# Pattern "orders.*.failed" matches "orders.payment.failed"
rabbitmqctl declare exchange name=notifications type=topic durable=true

# Headers exchange — routes by message header attributes
rabbitmqctl declare exchange name=priority type=headers durable=true

Queue Policies

# Apply HA policy (classic mirrored queues — deprecated, prefer quorum)
rabbitmqctl set_policy ha-all "^" \
  '{"ha-mode":"all","ha-sync-mode":"automatic"}' \
  --apply-to queues --priority 0

# Quorum queues (recommended for durability, replaces mirrored queues)
# Create a quorum queue:
rabbitmqctl declare queue name=my-quorum-queue \
  durable=true arguments='{"x-queue-type": "quorum"}'

# Set TTL policy — messages expire after 60 seconds
rabbitmqctl set_policy msg-ttl "^transient\." \
  '{"message-ttl": 60000}' --apply-to queues

# Set max-length policy — protect from unbounded growth
rabbitmqctl set_policy max-len "^" \
  '{"max-length": 100000, "overflow": "reject-publish"}' \
  --apply-to queues --priority 1

# Dead-letter exchange policy
rabbitmqctl set_policy dlx "^orders" \
  '{"dead-letter-exchange": "orders.dlx"}' --apply-to queues

# List all policies
rabbitmqctl list_policies

Vhost Isolation

# Create a vhost for isolation
rabbitmqctl add_vhost /production
rabbitmqctl add_vhost /staging

# Create user with access to specific vhost
rabbitmqctl add_user app_user strong_password
rabbitmqctl set_permissions -p /production app_user ".*" ".*" ".*"
# Format: configure_regex read_regex write_regex

# List users and their vhost permissions
rabbitmqctl list_users
rabbitmqctl list_permissions -p /production

# Delete a vhost (WARNING: deletes all queues and messages in it)
rabbitmqctl delete_vhost /old-staging

Prefetch Tuning

Default trap: With prefetch_count=0 (the default in most clients), RabbitMQ pushes ALL available messages to the consumer as fast as possible. If you have 3 consumers with prefetch 0, the first consumer to connect gets nearly all messages, starving the others. Always set an explicit prefetch.

# In consumer code — critical for fairness under load
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# Set prefetch: consumer gets at most 10 unacked messages at a time
channel.basic_qos(prefetch_count=10)

def callback(ch, method, properties, body):
    # process message
    process(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='my-queue', on_message_callback=callback)
channel.start_consuming()

Shovel for Cross-Cluster Message Transfer

# Enable shovel plugin
rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

# Set up a shovel (dynamic)
rabbitmqctl set_parameter shovel my-shovel \
  '{"src-protocol": "amqp091",
    "src-uri": "amqp://user:pass@source-host",
    "src-queue": "my-queue",
    "dest-protocol": "amqp091",
    "dest-uri": "amqp://user:pass@dest-host",
    "dest-queue": "my-queue"}'

# Check shovel status
rabbitmqctl shovel_status