Back to Blog
Redis Caching Strategies: Complete Guide to High-Performance Caching

Redis Caching Strategies: Complete Guide to High-Performance Caching

December 16, 2024
8 min read
Tushar Agrawal

Master Redis caching patterns including cache-aside, write-through, write-behind, and cache invalidation strategies. Learn practical implementations with Python and real-world performance optimization techniques.

Introduction

Caching is the secret weapon of high-performance applications. At Dr. Dangs Lab, implementing Redis caching reduced our API response times from 800ms to under 50ms for frequently accessed data. Here's everything I've learned about Redis caching strategies.

Why Redis?

Redis is an in-memory data structure store that offers:

  • Speed: Sub-millisecond latency for most operations
  • Versatility: Supports strings, hashes, lists, sets, sorted sets
  • Persistence: Optional disk persistence
  • Replication: Master-replica support for high availability
  • Pub/Sub: Real-time messaging capabilities

Caching Patterns

1. Cache-Aside (Lazy Loading)

The most common pattern - application manages the cache.

import redis
import json
from typing import Optional, Any
from functools import wraps

redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

class CacheAside:
    def __init__(self, redis_client: redis.Redis, default_ttl: int = 3600):
        self.redis = redis_client
        self.default_ttl = default_ttl

    def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)
        return None

    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """Set value in cache"""
        self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(value)
        )

    def delete(self, key: str) -> None:
        """Invalidate cache entry"""
        self.redis.delete(key)

# Usage example
cache = CacheAside(redis_client)

async def get_user(user_id: int) -> dict:
    cache_key = f"user:{user_id}"

    # 1. Check cache first
    cached_user = cache.get(cache_key)
    if cached_user:
        return cached_user

    # 2. Cache miss - fetch from database
    user = await db.fetch_user(user_id)

    # 3. Store in cache for next time
    if user:
        cache.set(cache_key, user)

    return user

2. Write-Through

Data is written to cache and database simultaneously.

class WriteThrough:
    def __init__(self, redis_client: redis.Redis, db_client):
        self.redis = redis_client
        self.db = db_client

    async def write(self, key: str, value: dict, ttl: int = 3600) -> None:
        """Write to both cache and database atomically"""
        try:
            # Write to database first
            await self.db.save(key, value)

            # Then update cache
            self.redis.setex(key, ttl, json.dumps(value))
        except Exception as e:
            # If database write fails, don't update cache
            raise e

    def read(self, key: str) -> Optional[dict]:
        """Read from cache, fallback to database"""
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        # Cache miss - this shouldn't happen often with write-through
        value = self.db.get(key)
        if value:
            self.redis.setex(key, 3600, json.dumps(value))
        return value

# Usage
cache = WriteThrough(redis_client, db_client)

async def update_user(user_id: int, data: dict):
    key = f"user:{user_id}"
    await cache.write(key, data)
    return data

3. Write-Behind (Write-Back)

Writes are batched and asynchronously persisted to the database.

import asyncio
from collections import defaultdict
from datetime import datetime, timedelta

class WriteBehind:
    def __init__(self, redis_client: redis.Redis, db_client, flush_interval: int = 5):
        self.redis = redis_client
        self.db = db_client
        self.flush_interval = flush_interval
        self.pending_writes = defaultdict(dict)
        self._start_background_flush()

    def _start_background_flush(self):
        """Start background task to flush pending writes"""
        asyncio.create_task(self._flush_loop())

    async def _flush_loop(self):
        """Periodically flush pending writes to database"""
        while True:
            await asyncio.sleep(self.flush_interval)
            await self._flush()

    async def _flush(self):
        """Flush all pending writes to database"""
        if not self.pending_writes:
            return

        writes = dict(self.pending_writes)
        self.pending_writes.clear()

        try:
            await self.db.bulk_write(writes)
        except Exception as e:
            # Re-add failed writes for retry
            self.pending_writes.update(writes)
            raise e

    def write(self, key: str, value: dict, ttl: int = 3600) -> None:
        """Write to cache immediately, database later"""
        # Update cache immediately
        self.redis.setex(key, ttl, json.dumps(value))

        # Queue for database write
        self.pending_writes[key] = value

    def read(self, key: str) -> Optional[dict]:
        """Read from cache (always fresh due to write-behind)"""
        cached = self.redis.get(key)
        return json.loads(cached) if cached else None

Cache Invalidation Strategies

Time-Based Expiration (TTL)

# Simple TTL
redis_client.setex("session:123", 3600, "session_data")  # 1 hour

# Different TTLs for different data types
TTL_CONFIG = {
    "user_profile": 86400,      # 24 hours - rarely changes
    "user_session": 3600,       # 1 hour - security
    "product_list": 300,        # 5 minutes - moderate changes
    "stock_price": 5,           # 5 seconds - frequent changes
}

def cache_with_ttl(data_type: str, key: str, value: Any):
    ttl = TTL_CONFIG.get(data_type, 3600)
    redis_client.setex(f"{data_type}:{key}", ttl, json.dumps(value))

Event-Based Invalidation

class EventBasedCache:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def invalidate_user_cache(self, user_id: int):
        """Invalidate all cache entries related to a user"""
        patterns = [
            f"user:{user_id}",
            f"user:{user_id}:*",
            f"orders:user:{user_id}",
        ]

        for pattern in patterns:
            keys = self.redis.keys(pattern)
            if keys:
                self.redis.delete(*keys)

    def invalidate_by_tags(self, tags: list[str]):
        """Invalidate cache entries by tags using sets"""
        for tag in tags:
            # Get all keys associated with this tag
            keys = self.redis.smembers(f"tag:{tag}")
            if keys:
                self.redis.delete(*keys)
                self.redis.delete(f"tag:{tag}")

    def set_with_tags(self, key: str, value: Any, tags: list[str], ttl: int = 3600):
        """Set cache with tag associations"""
        pipe = self.redis.pipeline()

        # Set the actual value
        pipe.setex(key, ttl, json.dumps(value))

        # Associate key with tags
        for tag in tags:
            pipe.sadd(f"tag:{tag}", key)
            pipe.expire(f"tag:{tag}", ttl)

        pipe.execute()

# Usage
cache = EventBasedCache(redis_client)

# Cache product with tags
cache.set_with_tags(
    "product:123",
    {"name": "Laptop", "price": 999},
    tags=["category:electronics", "brand:dell"],
    ttl=3600
)

# Invalidate all electronics when category updates
cache.invalidate_by_tags(["category:electronics"])

Version-Based Invalidation

class VersionedCache:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def get_version(self, namespace: str) -> int:
        """Get current version for a namespace"""
        version = self.redis.get(f"version:{namespace}")
        return int(version) if version else 1

    def increment_version(self, namespace: str) -> int:
        """Increment version to invalidate all keys in namespace"""
        return self.redis.incr(f"version:{namespace}")

    def make_key(self, namespace: str, key: str) -> str:
        """Create versioned cache key"""
        version = self.get_version(namespace)
        return f"{namespace}:v{version}:{key}"

    def get(self, namespace: str, key: str) -> Optional[Any]:
        versioned_key = self.make_key(namespace, key)
        cached = self.redis.get(versioned_key)
        return json.loads(cached) if cached else None

    def set(self, namespace: str, key: str, value: Any, ttl: int = 3600):
        versioned_key = self.make_key(namespace, key)
        self.redis.setex(versioned_key, ttl, json.dumps(value))

# Usage
cache = VersionedCache(redis_client)

# Normal operations
cache.set("products", "123", {"name": "Laptop"})
product = cache.get("products", "123")

# Invalidate ALL products at once by incrementing version
cache.increment_version("products")
# Old keys become orphaned and expire naturally

Advanced Redis Data Structures

Using Hashes for Structured Data

class HashCache:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def set_user(self, user_id: int, user_data: dict):
        """Store user as hash - memory efficient for objects"""
        key = f"user:{user_id}"
        self.redis.hset(key, mapping=user_data)
        self.redis.expire(key, 3600)

    def get_user(self, user_id: int) -> Optional[dict]:
        """Get all user fields"""
        return self.redis.hgetall(f"user:{user_id}")

    def get_user_field(self, user_id: int, field: str) -> Optional[str]:
        """Get single field - more efficient than full object"""
        return self.redis.hget(f"user:{user_id}", field)

    def update_user_field(self, user_id: int, field: str, value: str):
        """Update single field without fetching entire object"""
        self.redis.hset(f"user:{user_id}", field, value)

# Advantages of hashes:
# - Update single fields without serialization
# - Memory efficient for small objects
# - Atomic field operations

Sorted Sets for Leaderboards

class LeaderboardCache:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    def add_score(self, leaderboard: str, user_id: str, score: float):
        """Add or update user score"""
        self.redis.zadd(f"leaderboard:{leaderboard}", {user_id: score})

    def get_top(self, leaderboard: str, count: int = 10) -> list[tuple]:
        """Get top N players with scores"""
        return self.redis.zrevrange(
            f"leaderboard:{leaderboard}",
            0, count - 1,
            withscores=True
        )

    def get_rank(self, leaderboard: str, user_id: str) -> Optional[int]:
        """Get user's rank (0-indexed)"""
        rank = self.redis.zrevrank(f"leaderboard:{leaderboard}", user_id)
        return rank + 1 if rank is not None else None

    def get_around_user(self, leaderboard: str, user_id: str, count: int = 5):
        """Get users around a specific user"""
        rank = self.redis.zrevrank(f"leaderboard:{leaderboard}", user_id)
        if rank is None:
            return []

        start = max(0, rank - count // 2)
        end = rank + count // 2

        return self.redis.zrevrange(
            f"leaderboard:{leaderboard}",
            start, end,
            withscores=True
        )

Cache Decorator Pattern

from functools import wraps
import hashlib

def cached(ttl: int = 3600, prefix: str = "cache"):
    """Decorator for caching function results"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Generate cache key from function name and arguments
            key_data = f"{func.__name__}:{args}:{sorted(kwargs.items())}"
            cache_key = f"{prefix}:{hashlib.md5(key_data.encode()).hexdigest()}"

            # Try cache first
            cached_result = redis_client.get(cache_key)
            if cached_result:
                return json.loads(cached_result)

            # Execute function
            result = await func(*args, **kwargs)

            # Cache result
            redis_client.setex(cache_key, ttl, json.dumps(result))

            return result
        return wrapper
    return decorator

# Usage
@cached(ttl=300, prefix="api")
async def get_user_orders(user_id: int, status: str = "all"):
    """This result will be cached for 5 minutes"""
    return await db.fetch_orders(user_id, status)

Performance Optimization Tips

Connection Pooling

import redis
from redis import ConnectionPool

# Create connection pool
pool = ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=50,
    decode_responses=True
)

# Reuse pool across application
redis_client = redis.Redis(connection_pool=pool)

Pipeline for Bulk Operations

def get_multiple_users(user_ids: list[int]) -> dict:
    """Fetch multiple users in single round-trip"""
    pipe = redis_client.pipeline()

    for user_id in user_ids:
        pipe.get(f"user:{user_id}")

    results = pipe.execute()

    return {
        user_id: json.loads(result) if result else None
        for user_id, result in zip(user_ids, results)
    }

# Without pipeline: N round-trips
# With pipeline: 1 round-trip

Compression for Large Values

import gzip
import json

class CompressedCache:
    def __init__(self, redis_client: redis.Redis, threshold: int = 1024):
        self.redis = redis_client
        self.threshold = threshold

    def set(self, key: str, value: Any, ttl: int = 3600):
        serialized = json.dumps(value).encode()

        if len(serialized) > self.threshold:
            # Compress large values
            compressed = gzip.compress(serialized)
            self.redis.setex(f"{key}:gz", ttl, compressed)
        else:
            self.redis.setex(key, ttl, serialized)

    def get(self, key: str) -> Optional[Any]:
        # Try compressed first
        compressed = self.redis.get(f"{key}:gz")
        if compressed:
            decompressed = gzip.decompress(compressed)
            return json.loads(decompressed)

        # Try uncompressed
        value = self.redis.get(key)
        return json.loads(value) if value else None

Key Takeaways

1. Choose the right pattern - Cache-aside for reads, write-through for consistency 2. Set appropriate TTLs - Balance freshness vs performance 3. Use proper data structures - Hashes for objects, sorted sets for rankings 4. Implement connection pooling - Essential for high-concurrency applications 5. Use pipelines - Reduce network round-trips for bulk operations 6. Plan invalidation strategy - Tag-based or version-based for complex scenarios 7. Monitor cache hit rates - Aim for 90%+ hit rate

Conclusion

Effective caching can transform application performance. Start with cache-aside pattern, implement proper invalidation, and gradually adopt more advanced patterns as your needs grow. Remember: cache invalidation is one of the hardest problems in computer science - invest time in getting it right.

---

Need help optimizing your caching strategy? Connect on LinkedIn.

Share this article

Related Articles