Message Queues: RabbitMQ vs Redis Streams vs Kafka - Complete Comparison
Deep comparison of RabbitMQ, Redis Streams, and Apache Kafka for message queuing. Performance benchmarks, use cases, and production patterns for choosing the right message broker.
Introduction
Message queues are the backbone of distributed systems, enabling asynchronous communication, load leveling, and service decoupling. But choosing between RabbitMQ, Redis Streams, and Apache Kafka isn't straightforward—each has distinct strengths.
Having implemented all three in production healthcare systems at Dr. Dangs Lab, I'll share real-world insights on when to use each technology.
Architecture Comparison
RabbitMQ - Traditional Message Broker
┌─────────────────────────────────────────────────────────────┐
│ RabbitMQ │
├─────────────────────────────────────────────────────────────┤
│ │
│ Producer ──▶ Exchange ──▶ Queue ──▶ Consumer │
│ │ │
│ ┌──────┴──────┐ │
│ │ Bindings │ │
│ │ (routing) │ │
│ └─────────────┘ │
│ │
│ Exchange Types: │
│ • Direct - Exact routing key match │
│ • Topic - Pattern matching (*.logs, audit.#) │
│ • Fanout - Broadcast to all queues │
│ • Headers - Match on message headers │
│ │
│ Features: │
│ ✓ Message acknowledgment │
│ ✓ Dead letter queues │
│ ✓ Priority queues │
│ ✓ Message TTL │
│ ✓ Flexible routing │
│ │
└─────────────────────────────────────────────────────────────┘
Redis Streams - Lightweight & Fast
┌─────────────────────────────────────────────────────────────┐
│ Redis Streams │
├─────────────────────────────────────────────────────────────┤
│ │
│ Stream: append-only log with unique IDs │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 1702000001-0 │ 1702000002-0 │ 1702000003-0 │ ... │ │
│ └─────────────────────────────────────────────────────┘ │
│ ▲ │
│ │ │
│ Consumer Groups: │
│ ┌─────────────────────────────────────────┐ │
│ │ Group: "processors" │ │
│ │ Consumer-1: reads 1702000001-0 │ │
│ │ Consumer-2: reads 1702000002-0 │ │
│ │ (load balanced, exactly-once per msg) │ │
│ └─────────────────────────────────────────┘ │
│ │
│ Features: │
│ ✓ Persistence with RDB/AOF │
│ ✓ Consumer groups │
│ ✓ Message acknowledgment │
│ ✓ Pending entry list (PEL) │
│ ✓ Stream trimming │
│ │
└─────────────────────────────────────────────────────────────┘
Apache Kafka - Distributed Log
┌─────────────────────────────────────────────────────────────┐
│ Apache Kafka │
├─────────────────────────────────────────────────────────────┤
│ │
│ Topic: distributed, partitioned log │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Partition 0: [msg1][msg2][msg3][msg4] ───▶ │ │
│ │ Partition 1: [msg5][msg6][msg7] ───▶ │ │
│ │ Partition 2: [msg8][msg9][msg10][msg11] ───▶ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Consumer Groups: │
│ ┌─────────────────────────────────────────┐ │
│ │ Group: "analytics" │ │
│ │ Consumer-1 ◄── Partition 0, 1 │ │
│ │ Consumer-2 ◄── Partition 2 │ │
│ └─────────────────────────────────────────┘ │
│ │
│ Features: │
│ ✓ Horizontal scaling via partitions │
│ ✓ Replication for fault tolerance │
│ ✓ Log compaction │
│ ✓ Exactly-once semantics │
│ ✓ High throughput (millions msg/sec) │
│ │
└─────────────────────────────────────────────────────────────┘
Performance Benchmarks
Throughput Test (Messages/Second)
┌─────────────────┬───────────┬───────────┬────────────┐
│ Scenario │ RabbitMQ │ Redis │ Kafka │
├─────────────────┼───────────┼───────────┼────────────┤
│ 1KB messages │ 25,000 │ 100,000 │ 500,000 │
│ 10KB messages │ 15,000 │ 50,000 │ 300,000 │
│ 100KB messages │ 5,000 │ 20,000 │ 100,000 │
│ 1MB messages │ 1,000 │ 5,000 │ 20,000 │
└─────────────────┴───────────┴───────────┴────────────┘Test: Single node, persistent messages, ack enabled
Hardware: AWS c5.2xlarge (8 vCPU, 16GB RAM)
Latency Test (P99 in milliseconds)
┌─────────────────┬───────────┬───────────┬────────────┐
│ Scenario │ RabbitMQ │ Redis │ Kafka │
├─────────────────┼───────────┼───────────┼────────────┤
│ No persistence │ 1.2 │ 0.3 │ 2.5 │
│ With persist │ 5.8 │ 1.2 │ 5.0 │
│ High load │ 15.0 │ 3.0 │ 8.0 │
└─────────────────┴───────────┴───────────┴────────────┘Note: Redis has lowest latency, Kafka trades latency for throughput
Implementation Examples
RabbitMQ with Python (aio-pika)
import asyncio
import aio_pika
from aio_pika import Message, DeliveryMode, ExchangeType
import jsonclass RabbitMQClient:
def __init__(self, url: str = "amqp://guest:guest@localhost/"):
self.url = url
self.connection = None
self.channel = None
async def connect(self):
self.connection = await aio_pika.connect_robust(self.url)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10)
async def setup_exchange(
self,
exchange_name: str,
exchange_type: ExchangeType = ExchangeType.TOPIC
):
return await self.channel.declare_exchange(
exchange_name,
exchange_type,
durable=True
)
async def setup_queue(
self,
queue_name: str,
exchange_name: str,
routing_key: str,
dead_letter_exchange: str = None
):
arguments = {}
if dead_letter_exchange:
arguments["x-dead-letter-exchange"] = dead_letter_exchange
queue = await self.channel.declare_queue(
queue_name,
durable=True,
arguments=arguments
)
exchange = await self.channel.get_exchange(exchange_name)
await queue.bind(exchange, routing_key)
return queue
async def publish(
self,
exchange_name: str,
routing_key: str,
message: dict,
priority: int = 0
):
exchange = await self.channel.get_exchange(exchange_name)
await exchange.publish(
Message(
body=json.dumps(message).encode(),
delivery_mode=DeliveryMode.PERSISTENT,
priority=priority,
content_type="application/json"
),
routing_key=routing_key
)
async def consume(
self,
queue_name: str,
callback,
auto_ack: bool = False
):
queue = await self.channel.get_queue(queue_name)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
try:
data = json.loads(message.body.decode())
await callback(data)
if not auto_ack:
await message.ack()
except Exception as e:
if not auto_ack:
await message.nack(requeue=False)
raise
Usage
async def main():
client = RabbitMQClient()
await client.connect() # Setup dead letter exchange
await client.setup_exchange("dead_letter", ExchangeType.FANOUT)
await client.setup_queue("dead_letter_queue", "dead_letter", "")
# Setup main exchange and queue
await client.setup_exchange("orders", ExchangeType.TOPIC)
await client.setup_queue(
"order_processing",
"orders",
"order.created.*",
dead_letter_exchange="dead_letter"
)
# Publish
await client.publish(
"orders",
"order.created.premium",
{"order_id": "123", "amount": 99.99}
)
# Consume
async def process_order(data):
print(f"Processing order: {data}")
await client.consume("order_processing", process_order)
asyncio.run(main())
Redis Streams with Python
import redis.asyncio as redis
from dataclasses import dataclass
from typing import List, Dict, Optional
import asyncio
import json@dataclass
class StreamMessage:
id: str
data: Dict
class RedisStreamClient:
def __init__(self, url: str = "redis://localhost:6379"):
self.redis = redis.from_url(url, decode_responses=True)
async def create_consumer_group(
self,
stream: str,
group: str,
start_id: str = "0"
):
try:
await self.redis.xgroup_create(stream, group, start_id, mkstream=True)
except redis.ResponseError as e:
if "BUSYGROUP" not in str(e):
raise
async def publish(
self,
stream: str,
data: dict,
max_len: int = 10000
) -> str:
return await self.redis.xadd(
stream,
{"data": json.dumps(data)},
maxlen=max_len,
approximate=True
)
async def consume(
self,
stream: str,
group: str,
consumer: str,
count: int = 10,
block: int = 5000
) -> List[StreamMessage]:
messages = await self.redis.xreadgroup(
groupname=group,
consumername=consumer,
streams={stream: ">"},
count=count,
block=block
)
result = []
for stream_name, stream_messages in messages:
for msg_id, msg_data in stream_messages:
result.append(StreamMessage(
id=msg_id,
data=json.loads(msg_data["data"])
))
return result
async def ack(self, stream: str, group: str, message_id: str):
await self.redis.xack(stream, group, message_id)
async def get_pending(
self,
stream: str,
group: str,
consumer: str = None,
count: int = 100
):
"""Get pending (unacknowledged) messages."""
pending = await self.redis.xpending_range(
stream, group,
min="-",
max="+",
count=count,
consumername=consumer
)
return pending
async def claim_stale_messages(
self,
stream: str,
group: str,
consumer: str,
min_idle_time: int = 60000 # 1 minute
) -> List[StreamMessage]:
"""Claim messages from dead consumers."""
pending = await self.get_pending(stream, group)
stale_ids = [
p["message_id"]
for p in pending
if p["time_since_delivered"] > min_idle_time
]
if not stale_ids:
return []
claimed = await self.redis.xclaim(
stream, group, consumer,
min_idle_time=min_idle_time,
message_ids=stale_ids
)
return [
StreamMessage(id=msg_id, data=json.loads(msg_data["data"]))
for msg_id, msg_data in claimed
]
Consumer worker
async def worker(
client: RedisStreamClient,
stream: str,
group: str,
consumer: str
):
while True:
# First, try to claim stale messages
stale = await client.claim_stale_messages(stream, group, consumer)
for msg in stale:
await process_message(msg)
await client.ack(stream, group, msg.id) # Then read new messages
messages = await client.consume(stream, group, consumer)
for msg in messages:
try:
await process_message(msg)
await client.ack(stream, group, msg.id)
except Exception as e:
print(f"Error processing {msg.id}: {e}")
# Message stays in pending, will be claimed later
async def process_message(msg: StreamMessage):
print(f"Processing: {msg.data}")
async def main():
client = RedisStreamClient()
# Setup
await client.create_consumer_group("orders", "processors")
# Publish
await client.publish("orders", {"order_id": "123", "amount": 99.99})
# Start workers
await asyncio.gather(
worker(client, "orders", "processors", "worker-1"),
worker(client, "orders", "processors", "worker-2"),
)
asyncio.run(main())
Apache Kafka with Python (aiokafka)
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
import asyncio
import json@dataclass
class KafkaMessage:
topic: str
partition: int
offset: int
key: Optional[str]
value: Dict
timestamp: int
class KafkaClient:
def __init__(self, bootstrap_servers: str = "localhost:9092"):
self.bootstrap_servers = bootstrap_servers
self.producer = None
self.consumers = {}
async def start_producer(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode() if k else None,
acks="all", # Wait for all replicas
enable_idempotence=True, # Exactly-once semantics
compression_type="lz4",
max_batch_size=16384,
linger_ms=5
)
await self.producer.start()
async def create_topics(self, topics: List[Dict]):
admin = AIOKafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
await admin.start()
new_topics = [
NewTopic(
name=t["name"],
num_partitions=t.get("partitions", 3),
replication_factor=t.get("replication", 1),
topic_configs=t.get("config", {})
)
for t in topics
]
try:
await admin.create_topics(new_topics)
except Exception as e:
print(f"Topic creation error (may already exist): {e}")
finally:
await admin.close()
async def publish(
self,
topic: str,
value: dict,
key: str = None,
partition: int = None
):
return await self.producer.send_and_wait(
topic,
value=value,
key=key,
partition=partition
)
async def publish_batch(
self,
topic: str,
messages: List[Dict]
):
"""Publish multiple messages efficiently."""
batch = self.producer.create_batch()
for msg in messages:
metadata = batch.append(
key=msg.get("key", "").encode() if msg.get("key") else None,
value=json.dumps(msg["value"]).encode(),
timestamp=None
)
if metadata is None:
# Batch is full, send it
await self.producer.send_batch(batch, topic)
batch = self.producer.create_batch()
batch.append(
key=msg.get("key", "").encode() if msg.get("key") else None,
value=json.dumps(msg["value"]).encode(),
timestamp=None
)
# Send remaining
if batch.record_count() > 0:
await self.producer.send_batch(batch, topic)
async def consume(
self,
topics: List[str],
group_id: str,
callback: Callable,
auto_commit: bool = False
):
consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=self.bootstrap_servers,
group_id=group_id,
auto_offset_reset="earliest",
enable_auto_commit=auto_commit,
value_deserializer=lambda v: json.loads(v.decode()),
max_poll_records=100,
session_timeout_ms=30000,
heartbeat_interval_ms=10000
)
await consumer.start()
self.consumers[group_id] = consumer
try:
async for msg in consumer:
kafka_msg = KafkaMessage(
topic=msg.topic,
partition=msg.partition,
offset=msg.offset,
key=msg.key.decode() if msg.key else None,
value=msg.value,
timestamp=msg.timestamp
)
try:
await callback(kafka_msg)
if not auto_commit:
await consumer.commit()
except Exception as e:
print(f"Error processing message: {e}")
# Handle error - maybe publish to dead letter topic
finally:
await consumer.stop()
Exactly-once processing with transactions
class TransactionalKafkaClient(KafkaClient):
async def start_producer(self, transactional_id: str):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
transactional_id=transactional_id,
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode() if k else None,
acks="all",
enable_idempotence=True
)
await self.producer.start()
await self.producer.begin_transaction() async def commit_transaction(self):
await self.producer.commit_transaction()
await self.producer.begin_transaction()
async def abort_transaction(self):
await self.producer.abort_transaction()
await self.producer.begin_transaction()
Usage
async def main():
client = KafkaClient() # Create topics
await client.create_topics([
{
"name": "orders",
"partitions": 6,
"replication": 3,
"config": {
"retention.ms": "604800000", # 7 days
"cleanup.policy": "delete"
}
},
{
"name": "order-events",
"partitions": 6,
"config": {
"cleanup.policy": "compact" # Keep latest per key
}
}
])
await client.start_producer()
# Publish with key (ensures ordering per key)
await client.publish(
"orders",
{"order_id": "123", "status": "created"},
key="order-123" # Same key = same partition = ordering
)
# Consume
async def process_order(msg: KafkaMessage):
print(f"[{msg.partition}:{msg.offset}] {msg.value}")
await client.consume(
["orders"],
"order-processor-group",
process_order
)
asyncio.run(main())
Comparison Matrix
| Feature | RabbitMQ | Redis Streams | Kafka | |---------|----------|---------------|-------| | Throughput | Medium (25K/s) | High (100K/s) | Very High (500K/s) | | Latency | Low (1-5ms) | Very Low (<1ms) | Medium (2-8ms) | | Ordering | Per queue | Per stream | Per partition | | Persistence | Disk + mirroring | RDB/AOF | Distributed log | | Replay | No (consumed once) | Yes | Yes | | Routing | Complex (exchanges) | Basic | Partitioning | | Scaling | Clustering | Cluster | Native horizontal | | Operations | Moderate | Simple | Complex | | Use Case | Task queues, RPC | Caching + streaming | Event sourcing, analytics |
When to Use Each
RabbitMQ
✅ Best for:
• Task queues with complex routing
• RPC patterns
• Priority queues
• Message TTL and dead-letter handling
• When message ordering per-consumer matters❌ Not ideal for:
• Very high throughput needs
• Long-term message storage
• Replay/reprocessing scenarios
Redis Streams
✅ Best for:
• Low-latency requirements
• Already using Redis
• Simple pub/sub + persistence
• Moderate throughput needs
• Session/cache + messaging in one❌ Not ideal for:
• Very large message volumes
• Complex routing
• Multi-datacenter replication
Apache Kafka
✅ Best for:
• High-throughput event streaming
• Event sourcing
• Log aggregation
• Real-time analytics
• Multi-consumer replay
• Microservices event backbone❌ Not ideal for:
• Simple task queues
• Low message volume
• When operational simplicity matters
Production Patterns
Dead Letter Queue Pattern
Works with all three systems
async def process_with_dlq(message, max_retries: int = 3):
retries = message.headers.get("retry_count", 0) try:
await process(message)
except Exception as e:
if retries < max_retries:
# Retry with backoff
message.headers["retry_count"] = retries + 1
await publish_with_delay(message, delay=2 retries)
else:
# Send to dead letter queue
await dlq_publisher.publish(message)
logger.error(f"Message sent to DLQ after {max_retries} retries: {e}")
Exactly-Once Processing
Idempotency with Redis
async def process_idempotent(message_id: str, processor):
lock_key = f"processing:{message_id}" # Try to acquire lock
acquired = await redis.set(lock_key, "1", nx=True, ex=300)
if not acquired:
return # Already processed or in progress
try:
await processor()
# Mark as completed
await redis.set(f"completed:{message_id}", "1", ex=86400)
except Exception:
# Release lock on failure
await redis.delete(lock_key)
raise
Conclusion
Each message queue has its sweet spot:
- RabbitMQ: Best for traditional messaging with complex routing needs
- Redis Streams: Best for low-latency, simple streaming with existing Redis
- Kafka: Best for high-throughput event streaming and event sourcing
- Kafka for audit logs and event sourcing (immutable, replayable)
- RabbitMQ for task queues with priority (lab test processing)
- Redis Streams for real-time notifications (low latency)
Related Articles
- Apache Kafka Event Streaming Deep Dive - Kafka internals
- Event-Driven Architecture with Kafka - Event patterns
- Redis Caching Strategies - Redis patterns
- Building Scalable Microservices - Service architecture
- System Design Interview Guide - Architecture patterns