Redis Caching Strategies: Complete Guide to High-Performance Caching
Master Redis caching patterns including cache-aside, write-through, write-behind, and cache invalidation strategies. Learn practical implementations with Python and real-world performance optimization techniques.
Introduction
Caching is the secret weapon of high-performance applications. At Dr. Dangs Lab, implementing Redis caching reduced our API response times from 800ms to under 50ms for frequently accessed data. Here's everything I've learned about Redis caching strategies.
Why Redis?
Redis is an in-memory data structure store that offers:
- Speed: Sub-millisecond latency for most operations
- Versatility: Supports strings, hashes, lists, sets, sorted sets
- Persistence: Optional disk persistence
- Replication: Master-replica support for high availability
- Pub/Sub: Real-time messaging capabilities
Caching Patterns
1. Cache-Aside (Lazy Loading)
The most common pattern - application manages the cache.
import redis
import json
from typing import Optional, Any
from functools import wraps
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
class CacheAside:
def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
self.redis = redis_client
self.default_ttl = default_ttl
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
cached = self.redis.get(key)
if cached:
return json.loads(cached)
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Set value in cache"""
self.redis.setex(
key,
ttl or self.default_ttl,
json.dumps(value)
)
def delete(self, key: str) -> None:
"""Invalidate cache entry"""
self.redis.delete(key)
# Usage example
cache = CacheAside(redis_client)
async def get_user(user_id: int) -> dict:
cache_key = f"user:{user_id}"
# 1. Check cache first
cached_user = cache.get(cache_key)
if cached_user:
return cached_user
# 2. Cache miss - fetch from database
user = await db.fetch_user(user_id)
# 3. Store in cache for next time
if user:
cache.set(cache_key, user)
return user
2. Write-Through
Data is written to cache and database simultaneously.
class WriteThrough:
def __init__(self, redis_client: redis.Redis, db_client):
self.redis = redis_client
self.db = db_client
async def write(self, key: str, value: dict, ttl: int = 3600) -> None:
"""Write to both cache and database atomically"""
try:
# Write to database first
await self.db.save(key, value)
# Then update cache
self.redis.setex(key, ttl, json.dumps(value))
except Exception as e:
# If database write fails, don't update cache
raise e
def read(self, key: str) -> Optional[dict]:
"""Read from cache, fallback to database"""
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Cache miss - this shouldn't happen often with write-through
value = self.db.get(key)
if value:
self.redis.setex(key, 3600, json.dumps(value))
return value
# Usage
cache = WriteThrough(redis_client, db_client)
async def update_user(user_id: int, data: dict):
key = f"user:{user_id}"
await cache.write(key, data)
return data
3. Write-Behind (Write-Back)
Writes are batched and asynchronously persisted to the database.
import asyncio
from collections import defaultdict
from datetime import datetime, timedelta
class WriteBehind:
def __init__(self, redis_client: redis.Redis, db_client, flush_interval: int = 5):
self.redis = redis_client
self.db = db_client
self.flush_interval = flush_interval
self.pending_writes = defaultdict(dict)
self._start_background_flush()
def _start_background_flush(self):
"""Start background task to flush pending writes"""
asyncio.create_task(self._flush_loop())
async def _flush_loop(self):
"""Periodically flush pending writes to database"""
while True:
await asyncio.sleep(self.flush_interval)
await self._flush()
async def _flush(self):
"""Flush all pending writes to database"""
if not self.pending_writes:
return
writes = dict(self.pending_writes)
self.pending_writes.clear()
try:
await self.db.bulk_write(writes)
except Exception as e:
# Re-add failed writes for retry
self.pending_writes.update(writes)
raise e
def write(self, key: str, value: dict, ttl: int = 3600) -> None:
"""Write to cache immediately, database later"""
# Update cache immediately
self.redis.setex(key, ttl, json.dumps(value))
# Queue for database write
self.pending_writes[key] = value
def read(self, key: str) -> Optional[dict]:
"""Read from cache (always fresh due to write-behind)"""
cached = self.redis.get(key)
return json.loads(cached) if cached else None
Cache Invalidation Strategies
Time-Based Expiration (TTL)
# Simple TTL
redis_client.setex("session:123", 3600, "session_data") # 1 hour
# Different TTLs for different data types
TTL_CONFIG = {
"user_profile": 86400, # 24 hours - rarely changes
"user_session": 3600, # 1 hour - security
"product_list": 300, # 5 minutes - moderate changes
"stock_price": 5, # 5 seconds - frequent changes
}
def cache_with_ttl(data_type: str, key: str, value: Any):
ttl = TTL_CONFIG.get(data_type, 3600)
redis_client.setex(f"{data_type}:{key}", ttl, json.dumps(value))
Event-Based Invalidation
class EventBasedCache:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def invalidate_user_cache(self, user_id: int):
"""Invalidate all cache entries related to a user"""
patterns = [
f"user:{user_id}",
f"user:{user_id}:*",
f"orders:user:{user_id}",
]
for pattern in patterns:
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
def invalidate_by_tags(self, tags: list[str]):
"""Invalidate cache entries by tags using sets"""
for tag in tags:
# Get all keys associated with this tag
keys = self.redis.smembers(f"tag:{tag}")
if keys:
self.redis.delete(*keys)
self.redis.delete(f"tag:{tag}")
def set_with_tags(self, key: str, value: Any, tags: list[str], ttl: int = 3600):
"""Set cache with tag associations"""
pipe = self.redis.pipeline()
# Set the actual value
pipe.setex(key, ttl, json.dumps(value))
# Associate key with tags
for tag in tags:
pipe.sadd(f"tag:{tag}", key)
pipe.expire(f"tag:{tag}", ttl)
pipe.execute()
# Usage
cache = EventBasedCache(redis_client)
# Cache product with tags
cache.set_with_tags(
"product:123",
{"name": "Laptop", "price": 999},
tags=["category:electronics", "brand:dell"],
ttl=3600
)
# Invalidate all electronics when category updates
cache.invalidate_by_tags(["category:electronics"])
Version-Based Invalidation
class VersionedCache:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def get_version(self, namespace: str) -> int:
"""Get current version for a namespace"""
version = self.redis.get(f"version:{namespace}")
return int(version) if version else 1
def increment_version(self, namespace: str) -> int:
"""Increment version to invalidate all keys in namespace"""
return self.redis.incr(f"version:{namespace}")
def make_key(self, namespace: str, key: str) -> str:
"""Create versioned cache key"""
version = self.get_version(namespace)
return f"{namespace}:v{version}:{key}"
def get(self, namespace: str, key: str) -> Optional[Any]:
versioned_key = self.make_key(namespace, key)
cached = self.redis.get(versioned_key)
return json.loads(cached) if cached else None
def set(self, namespace: str, key: str, value: Any, ttl: int = 3600):
versioned_key = self.make_key(namespace, key)
self.redis.setex(versioned_key, ttl, json.dumps(value))
# Usage
cache = VersionedCache(redis_client)
# Normal operations
cache.set("products", "123", {"name": "Laptop"})
product = cache.get("products", "123")
# Invalidate ALL products at once by incrementing version
cache.increment_version("products")
# Old keys become orphaned and expire naturally
Advanced Redis Data Structures
Using Hashes for Structured Data
class HashCache:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def set_user(self, user_id: int, user_data: dict):
"""Store user as hash - memory efficient for objects"""
key = f"user:{user_id}"
self.redis.hset(key, mapping=user_data)
self.redis.expire(key, 3600)
def get_user(self, user_id: int) -> Optional[dict]:
"""Get all user fields"""
return self.redis.hgetall(f"user:{user_id}")
def get_user_field(self, user_id: int, field: str) -> Optional[str]:
"""Get single field - more efficient than full object"""
return self.redis.hget(f"user:{user_id}", field)
def update_user_field(self, user_id: int, field: str, value: str):
"""Update single field without fetching entire object"""
self.redis.hset(f"user:{user_id}", field, value)
# Advantages of hashes:
# - Update single fields without serialization
# - Memory efficient for small objects
# - Atomic field operations
Sorted Sets for Leaderboards
class LeaderboardCache:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def add_score(self, leaderboard: str, user_id: str, score: float):
"""Add or update user score"""
self.redis.zadd(f"leaderboard:{leaderboard}", {user_id: score})
def get_top(self, leaderboard: str, count: int = 10) -> list[tuple]:
"""Get top N players with scores"""
return self.redis.zrevrange(
f"leaderboard:{leaderboard}",
0, count - 1,
withscores=True
)
def get_rank(self, leaderboard: str, user_id: str) -> Optional[int]:
"""Get user's rank (0-indexed)"""
rank = self.redis.zrevrank(f"leaderboard:{leaderboard}", user_id)
return rank + 1 if rank is not None else None
def get_around_user(self, leaderboard: str, user_id: str, count: int = 5):
"""Get users around a specific user"""
rank = self.redis.zrevrank(f"leaderboard:{leaderboard}", user_id)
if rank is None:
return []
start = max(0, rank - count // 2)
end = rank + count // 2
return self.redis.zrevrange(
f"leaderboard:{leaderboard}",
start, end,
withscores=True
)
Cache Decorator Pattern
from functools import wraps
import hashlib
def cached(ttl: int = 3600, prefix: str = "cache"):
"""Decorator for caching function results"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key from function name and arguments
key_data = f"{func.__name__}:{args}:{sorted(kwargs.items())}"
cache_key = f"{prefix}:{hashlib.md5(key_data.encode()).hexdigest()}"
# Try cache first
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Execute function
result = await func(*args, **kwargs)
# Cache result
redis_client.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# Usage
@cached(ttl=300, prefix="api")
async def get_user_orders(user_id: int, status: str = "all"):
"""This result will be cached for 5 minutes"""
return await db.fetch_orders(user_id, status)
Performance Optimization Tips
Connection Pooling
import redis
from redis import ConnectionPool
# Create connection pool
pool = ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
decode_responses=True
)
# Reuse pool across application
redis_client = redis.Redis(connection_pool=pool)
Pipeline for Bulk Operations
def get_multiple_users(user_ids: list[int]) -> dict:
"""Fetch multiple users in single round-trip"""
pipe = redis_client.pipeline()
for user_id in user_ids:
pipe.get(f"user:{user_id}")
results = pipe.execute()
return {
user_id: json.loads(result) if result else None
for user_id, result in zip(user_ids, results)
}
# Without pipeline: N round-trips
# With pipeline: 1 round-trip
Compression for Large Values
import gzip
import json
class CompressedCache:
def __init__(self, redis_client: redis.Redis, threshold: int = 1024):
self.redis = redis_client
self.threshold = threshold
def set(self, key: str, value: Any, ttl: int = 3600):
serialized = json.dumps(value).encode()
if len(serialized) > self.threshold:
# Compress large values
compressed = gzip.compress(serialized)
self.redis.setex(f"{key}:gz", ttl, compressed)
else:
self.redis.setex(key, ttl, serialized)
def get(self, key: str) -> Optional[Any]:
# Try compressed first
compressed = self.redis.get(f"{key}:gz")
if compressed:
decompressed = gzip.decompress(compressed)
return json.loads(decompressed)
# Try uncompressed
value = self.redis.get(key)
return json.loads(value) if value else None
Key Takeaways
1. Choose the right pattern - Cache-aside for reads, write-through for consistency 2. Set appropriate TTLs - Balance freshness vs performance 3. Use proper data structures - Hashes for objects, sorted sets for rankings 4. Implement connection pooling - Essential for high-concurrency applications 5. Use pipelines - Reduce network round-trips for bulk operations 6. Plan invalidation strategy - Tag-based or version-based for complex scenarios 7. Monitor cache hit rates - Aim for 90%+ hit rate
Conclusion
Effective caching can transform application performance. Start with cache-aside pattern, implement proper invalidation, and gradually adopt more advanced patterns as your needs grow. Remember: cache invalidation is one of the hardest problems in computer science - invest time in getting it right.
---
Need help optimizing your caching strategy? Connect on LinkedIn.