Back to Blog
Message Queues: RabbitMQ vs Redis Streams vs Kafka - Complete Comparison

Message Queues: RabbitMQ vs Redis Streams vs Kafka - Complete Comparison

December 19, 2025
11 min read
Tushar Agrawal

Deep comparison of RabbitMQ, Redis Streams, and Apache Kafka for message queuing. Performance benchmarks, use cases, and production patterns for choosing the right message broker.

Introduction

Message queues are the backbone of distributed systems, enabling asynchronous communication, load leveling, and service decoupling. But choosing between RabbitMQ, Redis Streams, and Apache Kafka isn't straightforward—each has distinct strengths.

Having implemented all three in production healthcare systems at Dr. Dangs Lab, I'll share real-world insights on when to use each technology.

Architecture Comparison

RabbitMQ - Traditional Message Broker

┌─────────────────────────────────────────────────────────────┐
│                        RabbitMQ                              │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   Producer ──▶ Exchange ──▶ Queue ──▶ Consumer              │
│                   │                                          │
│            ┌──────┴──────┐                                   │
│            │  Bindings   │                                   │
│            │  (routing)  │                                   │
│            └─────────────┘                                   │
│                                                              │
│   Exchange Types:                                            │
│   • Direct  - Exact routing key match                       │
│   • Topic   - Pattern matching (*.logs, audit.#)            │
│   • Fanout  - Broadcast to all queues                       │
│   • Headers - Match on message headers                       │
│                                                              │
│   Features:                                                  │
│   ✓ Message acknowledgment                                  │
│   ✓ Dead letter queues                                      │
│   ✓ Priority queues                                         │
│   ✓ Message TTL                                             │
│   ✓ Flexible routing                                        │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Redis Streams - Lightweight & Fast

┌─────────────────────────────────────────────────────────────┐
│                      Redis Streams                           │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   Stream: append-only log with unique IDs                   │
│                                                              │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ 1702000001-0 │ 1702000002-0 │ 1702000003-0 │ ...    │   │
│   └─────────────────────────────────────────────────────┘   │
│         ▲                                                    │
│         │                                                    │
│   Consumer Groups:                                           │
│   ┌─────────────────────────────────────────┐               │
│   │ Group: "processors"                      │               │
│   │   Consumer-1: reads 1702000001-0        │               │
│   │   Consumer-2: reads 1702000002-0        │               │
│   │   (load balanced, exactly-once per msg) │               │
│   └─────────────────────────────────────────┘               │
│                                                              │
│   Features:                                                  │
│   ✓ Persistence with RDB/AOF                                │
│   ✓ Consumer groups                                         │
│   ✓ Message acknowledgment                                  │
│   ✓ Pending entry list (PEL)                                │
│   ✓ Stream trimming                                         │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Apache Kafka - Distributed Log

┌─────────────────────────────────────────────────────────────┐
│                      Apache Kafka                            │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   Topic: distributed, partitioned log                       │
│                                                              │
│   ┌─────────────────────────────────────────────────────┐   │
│   │ Partition 0: [msg1][msg2][msg3][msg4] ───▶          │   │
│   │ Partition 1: [msg5][msg6][msg7] ───▶                │   │
│   │ Partition 2: [msg8][msg9][msg10][msg11] ───▶        │   │
│   └─────────────────────────────────────────────────────┘   │
│                                                              │
│   Consumer Groups:                                           │
│   ┌─────────────────────────────────────────┐               │
│   │ Group: "analytics"                       │               │
│   │   Consumer-1 ◄── Partition 0, 1         │               │
│   │   Consumer-2 ◄── Partition 2            │               │
│   └─────────────────────────────────────────┘               │
│                                                              │
│   Features:                                                  │
│   ✓ Horizontal scaling via partitions                       │
│   ✓ Replication for fault tolerance                         │
│   ✓ Log compaction                                          │
│   ✓ Exactly-once semantics                                  │
│   ✓ High throughput (millions msg/sec)                      │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Performance Benchmarks

Throughput Test (Messages/Second)

┌─────────────────┬───────────┬───────────┬────────────┐
│    Scenario     │ RabbitMQ  │  Redis    │   Kafka    │
├─────────────────┼───────────┼───────────┼────────────┤
│ 1KB messages    │   25,000  │  100,000  │  500,000   │
│ 10KB messages   │   15,000  │   50,000  │  300,000   │
│ 100KB messages  │    5,000  │   20,000  │  100,000   │
│ 1MB messages    │    1,000  │    5,000  │   20,000   │
└─────────────────┴───────────┴───────────┴────────────┘

Test: Single node, persistent messages, ack enabled Hardware: AWS c5.2xlarge (8 vCPU, 16GB RAM)

Latency Test (P99 in milliseconds)

┌─────────────────┬───────────┬───────────┬────────────┐
│    Scenario     │ RabbitMQ  │  Redis    │   Kafka    │
├─────────────────┼───────────┼───────────┼────────────┤
│ No persistence  │    1.2    │    0.3    │    2.5     │
│ With persist    │    5.8    │    1.2    │    5.0     │
│ High load       │   15.0    │    3.0    │    8.0     │
└─────────────────┴───────────┴───────────┴────────────┘

Note: Redis has lowest latency, Kafka trades latency for throughput

Implementation Examples

RabbitMQ with Python (aio-pika)

import asyncio
import aio_pika
from aio_pika import Message, DeliveryMode, ExchangeType
import json

class RabbitMQClient: def __init__(self, url: str = "amqp://guest:guest@localhost/"): self.url = url self.connection = None self.channel = None

async def connect(self): self.connection = await aio_pika.connect_robust(self.url) self.channel = await self.connection.channel() await self.channel.set_qos(prefetch_count=10)

async def setup_exchange( self, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC ): return await self.channel.declare_exchange( exchange_name, exchange_type, durable=True )

async def setup_queue( self, queue_name: str, exchange_name: str, routing_key: str, dead_letter_exchange: str = None ): arguments = {} if dead_letter_exchange: arguments["x-dead-letter-exchange"] = dead_letter_exchange

queue = await self.channel.declare_queue( queue_name, durable=True, arguments=arguments )

exchange = await self.channel.get_exchange(exchange_name) await queue.bind(exchange, routing_key)

return queue

async def publish( self, exchange_name: str, routing_key: str, message: dict, priority: int = 0 ): exchange = await self.channel.get_exchange(exchange_name)

await exchange.publish( Message( body=json.dumps(message).encode(), delivery_mode=DeliveryMode.PERSISTENT, priority=priority, content_type="application/json" ), routing_key=routing_key )

async def consume( self, queue_name: str, callback, auto_ack: bool = False ): queue = await self.channel.get_queue(queue_name)

async with queue.iterator() as queue_iter: async for message in queue_iter: try: data = json.loads(message.body.decode()) await callback(data)

if not auto_ack: await message.ack()

except Exception as e: if not auto_ack: await message.nack(requeue=False) raise

Usage

async def main(): client = RabbitMQClient() await client.connect()

# Setup dead letter exchange await client.setup_exchange("dead_letter", ExchangeType.FANOUT) await client.setup_queue("dead_letter_queue", "dead_letter", "")

# Setup main exchange and queue await client.setup_exchange("orders", ExchangeType.TOPIC) await client.setup_queue( "order_processing", "orders", "order.created.*", dead_letter_exchange="dead_letter" )

# Publish await client.publish( "orders", "order.created.premium", {"order_id": "123", "amount": 99.99} )

# Consume async def process_order(data): print(f"Processing order: {data}")

await client.consume("order_processing", process_order)

asyncio.run(main())

Redis Streams with Python

import redis.asyncio as redis
from dataclasses import dataclass
from typing import List, Dict, Optional
import asyncio
import json

@dataclass class StreamMessage: id: str data: Dict

class RedisStreamClient: def __init__(self, url: str = "redis://localhost:6379"): self.redis = redis.from_url(url, decode_responses=True)

async def create_consumer_group( self, stream: str, group: str, start_id: str = "0" ): try: await self.redis.xgroup_create(stream, group, start_id, mkstream=True) except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise

async def publish( self, stream: str, data: dict, max_len: int = 10000 ) -> str: return await self.redis.xadd( stream, {"data": json.dumps(data)}, maxlen=max_len, approximate=True )

async def consume( self, stream: str, group: str, consumer: str, count: int = 10, block: int = 5000 ) -> List[StreamMessage]: messages = await self.redis.xreadgroup( groupname=group, consumername=consumer, streams={stream: ">"}, count=count, block=block )

result = [] for stream_name, stream_messages in messages: for msg_id, msg_data in stream_messages: result.append(StreamMessage( id=msg_id, data=json.loads(msg_data["data"]) ))

return result

async def ack(self, stream: str, group: str, message_id: str): await self.redis.xack(stream, group, message_id)

async def get_pending( self, stream: str, group: str, consumer: str = None, count: int = 100 ): """Get pending (unacknowledged) messages.""" pending = await self.redis.xpending_range( stream, group, min="-", max="+", count=count, consumername=consumer ) return pending

async def claim_stale_messages( self, stream: str, group: str, consumer: str, min_idle_time: int = 60000 # 1 minute ) -> List[StreamMessage]: """Claim messages from dead consumers.""" pending = await self.get_pending(stream, group)

stale_ids = [ p["message_id"] for p in pending if p["time_since_delivered"] > min_idle_time ]

if not stale_ids: return []

claimed = await self.redis.xclaim( stream, group, consumer, min_idle_time=min_idle_time, message_ids=stale_ids )

return [ StreamMessage(id=msg_id, data=json.loads(msg_data["data"])) for msg_id, msg_data in claimed ]

Consumer worker

async def worker( client: RedisStreamClient, stream: str, group: str, consumer: str ): while True: # First, try to claim stale messages stale = await client.claim_stale_messages(stream, group, consumer) for msg in stale: await process_message(msg) await client.ack(stream, group, msg.id)

# Then read new messages messages = await client.consume(stream, group, consumer) for msg in messages: try: await process_message(msg) await client.ack(stream, group, msg.id) except Exception as e: print(f"Error processing {msg.id}: {e}") # Message stays in pending, will be claimed later

async def process_message(msg: StreamMessage): print(f"Processing: {msg.data}")

async def main(): client = RedisStreamClient()

# Setup await client.create_consumer_group("orders", "processors")

# Publish await client.publish("orders", {"order_id": "123", "amount": 99.99})

# Start workers await asyncio.gather( worker(client, "orders", "processors", "worker-1"), worker(client, "orders", "processors", "worker-2"), )

asyncio.run(main())

Apache Kafka with Python (aiokafka)

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
import asyncio
import json

@dataclass class KafkaMessage: topic: str partition: int offset: int key: Optional[str] value: Dict timestamp: int

class KafkaClient: def __init__(self, bootstrap_servers: str = "localhost:9092"): self.bootstrap_servers = bootstrap_servers self.producer = None self.consumers = {}

async def start_producer(self): self.producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode(), key_serializer=lambda k: k.encode() if k else None, acks="all", # Wait for all replicas enable_idempotence=True, # Exactly-once semantics compression_type="lz4", max_batch_size=16384, linger_ms=5 ) await self.producer.start()

async def create_topics(self, topics: List[Dict]): admin = AIOKafkaAdminClient(bootstrap_servers=self.bootstrap_servers) await admin.start()

new_topics = [ NewTopic( name=t["name"], num_partitions=t.get("partitions", 3), replication_factor=t.get("replication", 1), topic_configs=t.get("config", {}) ) for t in topics ]

try: await admin.create_topics(new_topics) except Exception as e: print(f"Topic creation error (may already exist): {e}") finally: await admin.close()

async def publish( self, topic: str, value: dict, key: str = None, partition: int = None ): return await self.producer.send_and_wait( topic, value=value, key=key, partition=partition )

async def publish_batch( self, topic: str, messages: List[Dict] ): """Publish multiple messages efficiently.""" batch = self.producer.create_batch()

for msg in messages: metadata = batch.append( key=msg.get("key", "").encode() if msg.get("key") else None, value=json.dumps(msg["value"]).encode(), timestamp=None ) if metadata is None: # Batch is full, send it await self.producer.send_batch(batch, topic) batch = self.producer.create_batch() batch.append( key=msg.get("key", "").encode() if msg.get("key") else None, value=json.dumps(msg["value"]).encode(), timestamp=None )

# Send remaining if batch.record_count() > 0: await self.producer.send_batch(batch, topic)

async def consume( self, topics: List[str], group_id: str, callback: Callable, auto_commit: bool = False ): consumer = AIOKafkaConsumer( *topics, bootstrap_servers=self.bootstrap_servers, group_id=group_id, auto_offset_reset="earliest", enable_auto_commit=auto_commit, value_deserializer=lambda v: json.loads(v.decode()), max_poll_records=100, session_timeout_ms=30000, heartbeat_interval_ms=10000 )

await consumer.start() self.consumers[group_id] = consumer

try: async for msg in consumer: kafka_msg = KafkaMessage( topic=msg.topic, partition=msg.partition, offset=msg.offset, key=msg.key.decode() if msg.key else None, value=msg.value, timestamp=msg.timestamp )

try: await callback(kafka_msg) if not auto_commit: await consumer.commit() except Exception as e: print(f"Error processing message: {e}") # Handle error - maybe publish to dead letter topic

finally: await consumer.stop()

Exactly-once processing with transactions

class TransactionalKafkaClient(KafkaClient): async def start_producer(self, transactional_id: str): self.producer = AIOKafkaProducer( bootstrap_servers=self.bootstrap_servers, transactional_id=transactional_id, value_serializer=lambda v: json.dumps(v).encode(), key_serializer=lambda k: k.encode() if k else None, acks="all", enable_idempotence=True ) await self.producer.start() await self.producer.begin_transaction()

async def commit_transaction(self): await self.producer.commit_transaction() await self.producer.begin_transaction()

async def abort_transaction(self): await self.producer.abort_transaction() await self.producer.begin_transaction()

Usage

async def main(): client = KafkaClient()

# Create topics await client.create_topics([ { "name": "orders", "partitions": 6, "replication": 3, "config": { "retention.ms": "604800000", # 7 days "cleanup.policy": "delete" } }, { "name": "order-events", "partitions": 6, "config": { "cleanup.policy": "compact" # Keep latest per key } } ])

await client.start_producer()

# Publish with key (ensures ordering per key) await client.publish( "orders", {"order_id": "123", "status": "created"}, key="order-123" # Same key = same partition = ordering )

# Consume async def process_order(msg: KafkaMessage): print(f"[{msg.partition}:{msg.offset}] {msg.value}")

await client.consume( ["orders"], "order-processor-group", process_order )

asyncio.run(main())

Comparison Matrix

| Feature | RabbitMQ | Redis Streams | Kafka | |---------|----------|---------------|-------| | Throughput | Medium (25K/s) | High (100K/s) | Very High (500K/s) | | Latency | Low (1-5ms) | Very Low (<1ms) | Medium (2-8ms) | | Ordering | Per queue | Per stream | Per partition | | Persistence | Disk + mirroring | RDB/AOF | Distributed log | | Replay | No (consumed once) | Yes | Yes | | Routing | Complex (exchanges) | Basic | Partitioning | | Scaling | Clustering | Cluster | Native horizontal | | Operations | Moderate | Simple | Complex | | Use Case | Task queues, RPC | Caching + streaming | Event sourcing, analytics |

When to Use Each

RabbitMQ

✅ Best for:
   • Task queues with complex routing
   • RPC patterns
   • Priority queues
   • Message TTL and dead-letter handling
   • When message ordering per-consumer matters

❌ Not ideal for: • Very high throughput needs • Long-term message storage • Replay/reprocessing scenarios

Redis Streams

✅ Best for:
   • Low-latency requirements
   • Already using Redis
   • Simple pub/sub + persistence
   • Moderate throughput needs
   • Session/cache + messaging in one

❌ Not ideal for: • Very large message volumes • Complex routing • Multi-datacenter replication

Apache Kafka

✅ Best for:
   • High-throughput event streaming
   • Event sourcing
   • Log aggregation
   • Real-time analytics
   • Multi-consumer replay
   • Microservices event backbone

❌ Not ideal for: • Simple task queues • Low message volume • When operational simplicity matters

Production Patterns

Dead Letter Queue Pattern

Works with all three systems

async def process_with_dlq(message, max_retries: int = 3): retries = message.headers.get("retry_count", 0)

try: await process(message) except Exception as e: if retries < max_retries: # Retry with backoff message.headers["retry_count"] = retries + 1 await publish_with_delay(message, delay=2 retries) else: # Send to dead letter queue await dlq_publisher.publish(message) logger.error(f"Message sent to DLQ after {max_retries} retries: {e}")

Exactly-Once Processing

Idempotency with Redis

async def process_idempotent(message_id: str, processor): lock_key = f"processing:{message_id}"

# Try to acquire lock acquired = await redis.set(lock_key, "1", nx=True, ex=300) if not acquired: return # Already processed or in progress

try: await processor() # Mark as completed await redis.set(f"completed:{message_id}", "1", ex=86400) except Exception: # Release lock on failure await redis.delete(lock_key) raise

Conclusion

Each message queue has its sweet spot:

  • RabbitMQ: Best for traditional messaging with complex routing needs
  • Redis Streams: Best for low-latency, simple streaming with existing Redis
  • Kafka: Best for high-throughput event streaming and event sourcing
In my healthcare systems, I use:
  • Kafka for audit logs and event sourcing (immutable, replayable)
  • RabbitMQ for task queues with priority (lab test processing)
  • Redis Streams for real-time notifications (low latency)
Choose based on your specific requirements, not hype.

Related Articles

Share this article

Related Articles