Back to Blog
Rate Limiting & API Gateway Patterns: Production Implementation Guide

Rate Limiting & API Gateway Patterns: Production Implementation Guide

December 19, 2025
12 min read
Tushar Agrawal

Master API rate limiting with token bucket, sliding window, and distributed algorithms. Implement Kong, Nginx, and custom rate limiters with Redis for high-traffic production systems.

Introduction

When your API goes viral or faces a DDoS attack, rate limiting is your first line of defense. It protects backend services from overload, ensures fair usage, and maintains system stability under pressure.

Having built rate limiting systems for healthcare APIs handling millions of requests, I've learned that the algorithm choice and implementation details matter enormously. This guide covers everything from basic concepts to distributed rate limiting at scale.

Rate Limiting Algorithms

1. Token Bucket Algorithm

The token bucket is the most versatile algorithm, allowing bursts while maintaining average rate limits.

┌─────────────────────────────────────────────────────┐
│                   TOKEN BUCKET                       │
├─────────────────────────────────────────────────────┤
│                                                      │
│    Tokens added at fixed rate (e.g., 10/second)     │
│                    ↓                                 │
│              ┌─────────────┐                        │
│              │  ○ ○ ○ ○ ○  │  Bucket (capacity: 100)│
│              │  ○ ○ ○ ○ ○  │                        │
│              └─────────────┘                        │
│                    ↓                                 │
│         Request consumes 1 token                     │
│                    ↓                                 │
│    ┌─────────────────────────────────┐              │
│    │ Tokens available? → Allow       │              │
│    │ No tokens? → Reject (429)       │              │
│    └─────────────────────────────────┘              │
│                                                      │
└─────────────────────────────────────────────────────┘

import time
from dataclasses import dataclass
from typing import Tuple
import asyncio
import aioredis

@dataclass
class TokenBucketConfig:
    capacity: int  # Maximum tokens in bucket
    refill_rate: float  # Tokens added per second
    initial_tokens: int = None

    def __post_init__(self):
        if self.initial_tokens is None:
            self.initial_tokens = self.capacity


class TokenBucket:
    """In-memory token bucket rate limiter."""

    def __init__(self, config: TokenBucketConfig):
        self.config = config
        self.tokens = config.initial_tokens
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens: int = 1) -> Tuple[bool, dict]:
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill

            # Refill tokens
            self.tokens = min(
                self.config.capacity,
                self.tokens + elapsed * self.config.refill_rate
            )
            self.last_refill = now

            # Check if we have enough tokens
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True, {
                    'remaining': int(self.tokens),
                    'reset_at': now + (self.config.capacity - self.tokens) / self.config.refill_rate
                }
            else:
                wait_time = (tokens - self.tokens) / self.config.refill_rate
                return False, {
                    'remaining': 0,
                    'retry_after': wait_time
                }


class DistributedTokenBucket:
    """Redis-based distributed token bucket."""

    def __init__(self, redis: aioredis.Redis, config: TokenBucketConfig):
        self.redis = redis
        self.config = config

    async def acquire(self, key: str, tokens: int = 1) -> Tuple[bool, dict]:
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local requested = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
        local current_tokens = tonumber(bucket[1]) or capacity
        local last_refill = tonumber(bucket[2]) or now

        -- Calculate tokens to add
        local elapsed = now - last_refill
        local new_tokens = math.min(capacity, current_tokens + (elapsed * refill_rate))

        -- Check if request can be fulfilled
        if new_tokens >= requested then
            new_tokens = new_tokens - requested
            redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
            redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
            return {1, new_tokens}
        else
            redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
            redis.call('EXPIRE', key, math.ceil(capacity / refill_rate) + 1)
            return {0, new_tokens}
        end
        """

        result = await self.redis.eval(
            lua_script,
            1,
            key,
            self.config.capacity,
            self.config.refill_rate,
            tokens,
            time.time()
        )

        allowed = bool(result[0])
        remaining = int(result[1])

        if allowed:
            return True, {'remaining': remaining}
        else:
            wait_time = (tokens - remaining) / self.config.refill_rate
            return False, {'remaining': 0, 'retry_after': wait_time}

2. Sliding Window Algorithm

More accurate than fixed windows, prevents the "boundary problem."

class SlidingWindowRateLimiter:
    """Sliding window rate limiter using Redis sorted sets."""

    def __init__(
        self,
        redis: aioredis.Redis,
        limit: int,
        window_seconds: int
    ):
        self.redis = redis
        self.limit = limit
        self.window = window_seconds

    async def is_allowed(self, key: str) -> Tuple[bool, dict]:
        now = time.time()
        window_start = now - self.window

        pipe = self.redis.pipeline()

        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)

        # Count current entries
        pipe.zcard(key)

        # Add new entry if allowed (optimistic)
        pipe.zadd(key, {str(now): now})

        # Set expiry
        pipe.expire(key, self.window + 1)

        results = await pipe.execute()
        current_count = results[1]

        if current_count < self.limit:
            return True, {
                'remaining': self.limit - current_count - 1,
                'reset_at': now + self.window
            }
        else:
            # Remove the optimistically added entry
            await self.redis.zrem(key, str(now))
            return False, {
                'remaining': 0,
                'retry_after': self.window
            }


class SlidingWindowCounter:
    """
    Sliding window counter - memory efficient approximation.
    Uses weighted average of current and previous window.
    """

    def __init__(
        self,
        redis: aioredis.Redis,
        limit: int,
        window_seconds: int
    ):
        self.redis = redis
        self.limit = limit
        self.window = window_seconds

    async def is_allowed(self, key: str) -> Tuple[bool, dict]:
        now = time.time()
        current_window = int(now // self.window)
        previous_window = current_window - 1
        window_elapsed = (now % self.window) / self.window

        # Get counts for current and previous windows
        current_key = f"{key}:{current_window}"
        previous_key = f"{key}:{previous_window}"

        pipe = self.redis.pipeline()
        pipe.get(current_key)
        pipe.get(previous_key)
        results = await pipe.execute()

        current_count = int(results[0] or 0)
        previous_count = int(results[1] or 0)

        # Weighted count
        weighted_count = (previous_count * (1 - window_elapsed)) + current_count

        if weighted_count < self.limit:
            # Increment current window
            await self.redis.incr(current_key)
            await self.redis.expire(current_key, self.window * 2)

            return True, {
                'remaining': int(self.limit - weighted_count - 1),
                'reset_at': (current_window + 1) * self.window
            }
        else:
            return False, {
                'remaining': 0,
                'retry_after': self.window - (now % self.window)
            }

3. Leaky Bucket Algorithm

Smooths out bursty traffic for consistent throughput.

import asyncio
from collections import deque
from dataclasses import dataclass

@dataclass
class LeakyBucketConfig:
    capacity: int  # Queue size
    leak_rate: float  # Requests processed per second


class LeakyBucket:
    """Leaky bucket for traffic shaping."""

    def __init__(self, config: LeakyBucketConfig):
        self.config = config
        self.queue = deque(maxlen=config.capacity)
        self._processing = False

    async def submit(self, request_id: str) -> bool:
        """Submit request to the bucket."""
        if len(self.queue) >= self.config.capacity:
            return False  # Bucket full, reject

        self.queue.append(request_id)

        if not self._processing:
            asyncio.create_task(self._process_queue())

        return True

    async def _process_queue(self):
        """Process requests at a fixed rate (leak)."""
        self._processing = True
        interval = 1.0 / self.config.leak_rate

        while self.queue:
            request_id = self.queue.popleft()
            await self._handle_request(request_id)
            await asyncio.sleep(interval)

        self._processing = False

    async def _handle_request(self, request_id: str):
        """Override to process request."""
        pass

FastAPI Rate Limiting Middleware

from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
from starlette.middleware.base import BaseHTTPMiddleware
import aioredis

app = FastAPI()

# Redis connection
redis_pool = None

async def get_redis():
    global redis_pool
    if redis_pool is None:
        redis_pool = await aioredis.from_url(
            "redis://localhost:6379",
            encoding="utf-8",
            decode_responses=True
        )
    return redis_pool


class RateLimitMiddleware(BaseHTTPMiddleware):
    """Multi-tier rate limiting middleware."""

    def __init__(self, app, tiers: dict):
        super().__init__(app)
        self.tiers = tiers  # {tier_name: (limit, window)}

    async def dispatch(self, request: Request, call_next):
        redis = await get_redis()

        # Identify client and tier
        client_id = self._get_client_id(request)
        tier = self._get_tier(request)
        limit, window = self.tiers.get(tier, (100, 60))

        # Check rate limit
        limiter = SlidingWindowCounter(redis, limit, window)
        key = f"ratelimit:{tier}:{client_id}"

        allowed, info = await limiter.is_allowed(key)

        if not allowed:
            return JSONResponse(
                status_code=429,
                content={
                    "error": "Rate limit exceeded",
                    "retry_after": info['retry_after']
                },
                headers={
                    "X-RateLimit-Limit": str(limit),
                    "X-RateLimit-Remaining": "0",
                    "X-RateLimit-Reset": str(int(info.get('reset_at', 0))),
                    "Retry-After": str(int(info['retry_after']))
                }
            )

        response = await call_next(request)

        # Add rate limit headers
        response.headers["X-RateLimit-Limit"] = str(limit)
        response.headers["X-RateLimit-Remaining"] = str(info['remaining'])

        return response

    def _get_client_id(self, request: Request) -> str:
        # Priority: API key > User ID > IP
        api_key = request.headers.get("X-API-Key")
        if api_key:
            return f"key:{api_key}"

        user = getattr(request.state, "user", None)
        if user:
            return f"user:{user.id}"

        forwarded = request.headers.get("X-Forwarded-For")
        if forwarded:
            return f"ip:{forwarded.split(',')[0].strip()}"

        return f"ip:{request.client.host}"

    def _get_tier(self, request: Request) -> str:
        # Determine tier from API key or user
        api_key = request.headers.get("X-API-Key")
        if api_key:
            # Look up tier from database/cache
            return "premium"  # Example

        return "free"


# Configure tiers
rate_limit_tiers = {
    "free": (100, 60),      # 100 requests per minute
    "basic": (1000, 60),    # 1000 requests per minute
    "premium": (10000, 60), # 10000 requests per minute
    "enterprise": (100000, 60)
}

app.add_middleware(RateLimitMiddleware, tiers=rate_limit_tiers)


# Per-endpoint rate limiting
def rate_limit(limit: int, window: int = 60):
    """Decorator for endpoint-specific rate limits."""

    async def dependency(request: Request):
        redis = await get_redis()
        client_id = request.headers.get("X-API-Key") or request.client.host
        endpoint = request.url.path

        key = f"ratelimit:endpoint:{endpoint}:{client_id}"
        limiter = SlidingWindowCounter(redis, limit, window)

        allowed, info = await limiter.is_allowed(key)
        if not allowed:
            raise HTTPException(
                status_code=429,
                detail={
                    "error": "Rate limit exceeded",
                    "retry_after": info['retry_after']
                }
            )

        return info

    return Depends(dependency)


@app.get("/api/expensive-operation")
async def expensive_operation(rate_info: dict = rate_limit(10, 60)):
    """This endpoint has a stricter rate limit."""
    return {"status": "success", "rate_limit_remaining": rate_info['remaining']}

API Gateway Configuration

Kong Rate Limiting

# kong.yml
_format_version: "3.0"

services:
  - name: user-service
    url: http://user-service:8000
    routes:
      - name: user-routes
        paths:
          - /api/users
        strip_path: false

plugins:
  # Global rate limiting
  - name: rate-limiting
    config:
      minute: 1000
      policy: redis
      redis_host: redis
      redis_port: 6379
      redis_database: 0
      hide_client_headers: false
      fault_tolerant: true

  # Per-consumer rate limiting
  - name: rate-limiting-advanced
    service: user-service
    config:
      identifier: consumer
      sync_rate: 10
      strategy: sliding
      limits:
        - 100 # per second
        - 5000 # per minute
      window_size:
        - 1
        - 60
      redis:
        host: redis
        port: 6379
        database: 1

consumers:
  - username: free-tier
    plugins:
      - name: rate-limiting
        config:
          minute: 100
          policy: local

  - username: premium-tier
    plugins:
      - name: rate-limiting
        config:
          minute: 10000
          policy: redis
          redis_host: redis

  - username: enterprise-tier
    plugins:
      - name: rate-limiting
        config:
          minute: 100000
          policy: redis
          redis_host: redis
          redis_timeout: 2000

Nginx Rate Limiting

# nginx.conf
http {
    # Define rate limit zones
    limit_req_zone $binary_remote_addr zone=ip_limit:10m rate=10r/s;
    limit_req_zone $http_x_api_key zone=api_key_limit:10m rate=100r/s;
    limit_req_zone $server_name zone=global_limit:10m rate=10000r/s;

    # Connection limits
    limit_conn_zone $binary_remote_addr zone=conn_limit:10m;

    # Rate limit status codes
    limit_req_status 429;
    limit_conn_status 429;

    upstream api_backend {
        server api1:8000 weight=5;
        server api2:8000 weight=5;
        keepalive 100;
    }

    server {
        listen 80;
        server_name api.example.com;

        # Global connection limit
        limit_conn conn_limit 100;

        # Default rate limiting by IP
        location /api/ {
            limit_req zone=ip_limit burst=20 nodelay;
            limit_req zone=global_limit burst=1000;

            proxy_pass http://api_backend;
            proxy_http_version 1.1;
            proxy_set_header Connection "";

            # Rate limit headers
            add_header X-RateLimit-Limit 10;
            add_header X-RateLimit-Burst 20;
        }

        # Premium endpoints with higher limits
        location /api/premium/ {
            # Check API key tier
            set $rate_limit_zone "ip_limit";

            if ($http_x_api_key) {
                set $rate_limit_zone "api_key_limit";
            }

            limit_req zone=$rate_limit_zone burst=50 nodelay;

            proxy_pass http://api_backend;
        }

        # Strict limit for auth endpoints
        location /api/auth/ {
            limit_req zone=ip_limit burst=5 nodelay;

            proxy_pass http://api_backend;
        }

        # Error page for rate limiting
        error_page 429 @rate_limited;

        location @rate_limited {
            default_type application/json;
            return 429 '{"error": "Rate limit exceeded", "retry_after": 1}';
        }
    }
}

Distributed Rate Limiting Patterns

Cluster-Wide Rate Limiting

import hashlib
from typing import List

class ConsistentHashRing:
    """Distribute rate limiting across Redis cluster."""

    def __init__(self, nodes: List[str], replicas: int = 100):
        self.replicas = replicas
        self.ring = {}
        self.sorted_keys = []

        for node in nodes:
            self.add_node(node)

    def add_node(self, node: str):
        for i in range(self.replicas):
            key = self._hash(f"{node}:{i}")
            self.ring[key] = node
            self.sorted_keys.append(key)
        self.sorted_keys.sort()

    def get_node(self, key: str) -> str:
        if not self.ring:
            return None

        hash_key = self._hash(key)
        for ring_key in self.sorted_keys:
            if ring_key >= hash_key:
                return self.ring[ring_key]
        return self.ring[self.sorted_keys[0]]

    def _hash(self, key: str) -> int:
        return int(hashlib.md5(key.encode()).hexdigest(), 16)


class ClusterRateLimiter:
    """Rate limiter for Redis cluster."""

    def __init__(self, redis_nodes: List[aioredis.Redis]):
        self.nodes = redis_nodes
        self.ring = ConsistentHashRing([str(i) for i in range(len(redis_nodes))])

    def _get_redis(self, key: str) -> aioredis.Redis:
        node_idx = int(self.ring.get_node(key))
        return self.nodes[node_idx]

    async def is_allowed(
        self,
        key: str,
        limit: int,
        window: int
    ) -> Tuple[bool, dict]:
        redis = self._get_redis(key)
        limiter = SlidingWindowCounter(redis, limit, window)
        return await limiter.is_allowed(key)

Circuit Breaker with Rate Limiting

from enum import Enum
from dataclasses import dataclass
import time

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


@dataclass
class CircuitConfig:
    failure_threshold: int = 5
    success_threshold: int = 3
    timeout: float = 30.0
    rate_limit: int = 100
    rate_window: int = 60


class CircuitBreaker:
    """Circuit breaker with integrated rate limiting."""

    def __init__(self, redis: aioredis.Redis, config: CircuitConfig):
        self.redis = redis
        self.config = config
        self.rate_limiter = SlidingWindowCounter(
            redis,
            config.rate_limit,
            config.rate_window
        )

    async def execute(self, key: str, func, *args, **kwargs):
        # Check circuit state
        state = await self._get_state(key)

        if state == CircuitState.OPEN:
            if await self._should_attempt(key):
                await self._set_state(key, CircuitState.HALF_OPEN)
            else:
                raise CircuitOpenError("Circuit is open")

        # Check rate limit
        allowed, info = await self.rate_limiter.is_allowed(f"rate:{key}")
        if not allowed:
            raise RateLimitError(f"Rate limit exceeded, retry after {info['retry_after']}s")

        try:
            result = await func(*args, **kwargs)
            await self._record_success(key)
            return result
        except Exception as e:
            await self._record_failure(key)
            raise

    async def _get_state(self, key: str) -> CircuitState:
        state = await self.redis.get(f"circuit:{key}:state")
        return CircuitState(state) if state else CircuitState.CLOSED

    async def _set_state(self, key: str, state: CircuitState):
        await self.redis.set(f"circuit:{key}:state", state.value)

    async def _record_success(self, key: str):
        state = await self._get_state(key)

        if state == CircuitState.HALF_OPEN:
            successes = await self.redis.incr(f"circuit:{key}:successes")
            if successes >= self.config.success_threshold:
                await self._set_state(key, CircuitState.CLOSED)
                await self.redis.delete(f"circuit:{key}:successes")

    async def _record_failure(self, key: str):
        failures = await self.redis.incr(f"circuit:{key}:failures")
        await self.redis.expire(f"circuit:{key}:failures", self.config.timeout)

        if failures >= self.config.failure_threshold:
            await self._set_state(key, CircuitState.OPEN)
            await self.redis.set(
                f"circuit:{key}:open_until",
                time.time() + self.config.timeout
            )

    async def _should_attempt(self, key: str) -> bool:
        open_until = await self.redis.get(f"circuit:{key}:open_until")
        if open_until and float(open_until) > time.time():
            return False
        return True

Monitoring Rate Limits

from prometheus_client import Counter, Histogram, Gauge

# Metrics
RATE_LIMIT_REQUESTS = Counter(
    'rate_limit_requests_total',
    'Total rate limit checks',
    ['tier', 'endpoint', 'result']
)

RATE_LIMIT_LATENCY = Histogram(
    'rate_limit_check_seconds',
    'Rate limit check latency',
    ['tier'],
    buckets=[.001, .005, .01, .025, .05, .1]
)

RATE_LIMIT_REMAINING = Gauge(
    'rate_limit_remaining',
    'Remaining requests in current window',
    ['client_id', 'tier']
)


class InstrumentedRateLimiter:
    """Rate limiter with Prometheus metrics."""

    def __init__(self, limiter: SlidingWindowCounter, tier: str):
        self.limiter = limiter
        self.tier = tier

    async def is_allowed(self, key: str, endpoint: str = "") -> Tuple[bool, dict]:
        with RATE_LIMIT_LATENCY.labels(tier=self.tier).time():
            allowed, info = await self.limiter.is_allowed(key)

        result = "allowed" if allowed else "rejected"
        RATE_LIMIT_REQUESTS.labels(
            tier=self.tier,
            endpoint=endpoint,
            result=result
        ).inc()

        if 'remaining' in info:
            RATE_LIMIT_REMAINING.labels(
                client_id=key,
                tier=self.tier
            ).set(info['remaining'])

        return allowed, info

Conclusion

Effective rate limiting protects your APIs from abuse while ensuring fair resource allocation:

  • Token Bucket for APIs needing burst tolerance
  • Sliding Window for accurate counting without boundary issues
  • Leaky Bucket for traffic shaping and smoothing
  • Distributed implementations with Redis for multi-instance deployments
  • Tiered limits for different customer segments
Remember to:
  • Always return proper 429 responses with Retry-After headers
  • Monitor rate limit metrics to tune thresholds
  • Implement graceful degradation with circuit breakers
  • Use consistent hashing for distributed rate limiting

Related Articles

Share this article

Related Articles