Microservices Security: Zero Trust Architecture Implementation Guide
Complete guide to implementing Zero Trust security in microservices. mTLS, service mesh security, API authentication, secrets management, and OWASP best practices for production systems.
Introduction
In a microservices architecture, the attack surface expands dramatically. The old "castle and moat" security model fails when services communicate across networks, containers, and clouds. Zero Trust assumes no implicit trust—every request must be authenticated, authorized, and encrypted.
Building HIPAA-compliant healthcare systems at Dr. Dangs Lab taught me that security isn't optional—it's foundational. This guide covers production-tested patterns for securing microservices.
Zero Trust Principles
┌─────────────────────────────────────────────────────────────┐
│ ZERO TRUST PRINCIPLES │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. Never Trust, Always Verify │
│ • Authenticate every request │
│ • Validate at every layer │
│ │
│ 2. Least Privilege Access │
│ • Minimal permissions by default │
│ • Just-in-time access │
│ │
│ 3. Assume Breach │
│ • Segment networks │
│ • Limit blast radius │
│ • Encrypt everything │
│ │
│ 4. Verify Explicitly │
│ • User identity + Device + Location + Behavior │
│ • Context-aware access decisions │
│ │
└─────────────────────────────────────────────────────────────┘
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ ZERO TRUST ARCHITECTURE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Internet │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ API Gateway │ │
│ │ • Rate limiting • WAF • DDoS protection │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Identity Provider │ │
│ │ • OAuth2/OIDC • JWT validation • MFA │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Service Mesh (Istio) │ │
│ │ ┌─────────┐ mTLS ┌─────────┐ mTLS ┌─────────┐ │ │
│ │ │ Service │◄────────►│ Service │◄────────►│ Service │ │ │
│ │ │ A │ │ B │ │ C │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Secrets Management │ │
│ │ • HashiCorp Vault • Dynamic secrets • Rotation │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Service-to-Service Authentication with mTLS
Certificate Management with cert-manager
cert-manager issuer for internal CA
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: internal-ca-issuer
spec:
ca:
secretName: internal-ca-secret---
Certificate for a service
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: user-service-cert
namespace: production
spec:
secretName: user-service-tls
duration: 720h # 30 days
renewBefore: 168h # 7 days
subject:
organizations:
- "My Company"
commonName: user-service.production.svc.cluster.local
dnsNames:
- user-service
- user-service.production
- user-service.production.svc
- user-service.production.svc.cluster.local
issuerRef:
name: internal-ca-issuer
kind: ClusterIssuer
Python mTLS Client
import ssl
import httpx
from pathlib import Pathclass MTLSClient:
"""HTTP client with mutual TLS authentication."""
def __init__(
self,
cert_path: str,
key_path: str,
ca_path: str,
verify_hostname: bool = True
):
self.ssl_context = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH,
cafile=ca_path
)
self.ssl_context.load_cert_chain(cert_path, key_path)
self.ssl_context.check_hostname = verify_hostname
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
async def get(self, url: str, kwargs) -> httpx.Response:
async with httpx.AsyncClient(verify=self.ssl_context) as client:
return await client.get(url, kwargs)
async def post(self, url: str, kwargs) -> httpx.Response:
async with httpx.AsyncClient(verify=self.ssl_context) as client:
return await client.post(url, kwargs)
FastAPI server with mTLS
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddlewareapp = FastAPI()
class MTLSMiddleware(BaseHTTPMiddleware):
"""Verify client certificate in mTLS."""
ALLOWED_SERVICES = {
"user-service.production.svc.cluster.local",
"order-service.production.svc.cluster.local",
"payment-service.production.svc.cluster.local",
}
async def dispatch(self, request: Request, call_next):
# Get client certificate from request
client_cert = request.scope.get("transport", {}).get("client_cert")
if not client_cert:
raise HTTPException(status_code=401, detail="Client certificate required")
# Validate certificate common name
cn = self._extract_cn(client_cert)
if cn not in self.ALLOWED_SERVICES:
raise HTTPException(status_code=403, detail=f"Service {cn} not authorized")
# Add service identity to request state
request.state.caller_service = cn
return await call_next(request)
def _extract_cn(self, cert) -> str:
for rdn in cert.subject:
if rdn[0][0] == "commonName":
return rdn[0][1]
return ""
app.add_middleware(MTLSMiddleware)
Run with mTLS
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
ssl_keyfile="/certs/server.key",
ssl_certfile="/certs/server.crt",
ssl_ca_certs="/certs/ca.crt",
ssl_cert_reqs=ssl.CERT_REQUIRED
)
JWT Authentication & Authorization
Token Service Implementation
from datetime import datetime, timedelta
from typing import Optional, List, Dict
import jwt
from pydantic import BaseModel
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentialsclass TokenPayload(BaseModel):
sub: str # Subject (user ID)
exp: datetime
iat: datetime
jti: str # JWT ID for revocation
roles: List[str]
permissions: List[str]
tenant_id: Optional[str] = None
service_account: bool = False
class JWTService:
"""JWT token management with rotation and revocation."""
def __init__(
self,
private_key: str,
public_key: str,
algorithm: str = "RS256",
access_token_ttl: int = 900, # 15 minutes
refresh_token_ttl: int = 86400 # 24 hours
):
self.private_key = private_key
self.public_key = public_key
self.algorithm = algorithm
self.access_ttl = access_token_ttl
self.refresh_ttl = refresh_token_ttl
self.revoked_tokens = set() # Use Redis in production
def create_access_token(
self,
user_id: str,
roles: List[str],
permissions: List[str],
tenant_id: str = None
) -> str:
now = datetime.utcnow()
payload = {
"sub": user_id,
"exp": now + timedelta(seconds=self.access_ttl),
"iat": now,
"jti": self._generate_jti(),
"roles": roles,
"permissions": permissions,
"tenant_id": tenant_id,
"type": "access"
}
return jwt.encode(payload, self.private_key, algorithm=self.algorithm)
def create_refresh_token(self, user_id: str) -> str:
now = datetime.utcnow()
payload = {
"sub": user_id,
"exp": now + timedelta(seconds=self.refresh_ttl),
"iat": now,
"jti": self._generate_jti(),
"type": "refresh"
}
return jwt.encode(payload, self.private_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> TokenPayload:
try:
payload = jwt.decode(
token,
self.public_key,
algorithms=[self.algorithm],
options={"require": ["exp", "sub", "jti"]}
)
# Check if revoked
if payload["jti"] in self.revoked_tokens:
raise HTTPException(status_code=401, detail="Token revoked")
return TokenPayload(payload)
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
def revoke_token(self, jti: str):
"""Revoke a specific token."""
self.revoked_tokens.add(jti)
def _generate_jti(self) -> str:
import secrets
return secrets.token_urlsafe(32)
FastAPI dependency for authentication
security = HTTPBearer()async def get_current_user(
credentials: HTTPAuthorizationCredentials = Security(security),
jwt_service: JWTService = Depends(get_jwt_service)
) -> TokenPayload:
return jwt_service.verify_token(credentials.credentials)
Permission decorator
def require_permissions(*required_permissions: str):
async def permission_checker(
user: TokenPayload = Depends(get_current_user)
):
for perm in required_permissions:
if perm not in user.permissions:
raise HTTPException(
status_code=403,
detail=f"Permission '{perm}' required"
)
return user
return Depends(permission_checker)
Usage
@app.get("/admin/users")
async def list_users(
user: TokenPayload = require_permissions("users:read", "admin:access")
):
return {"users": [...]}
Service Mesh Security with Istio
Istio Security Policies
Enable strict mTLS for namespace
apiVersion: security.istio.io/v1beta1
kind: PeerAuthentication
metadata:
name: default
namespace: production
spec:
mtls:
mode: STRICT---
Authorization policy - deny all by default
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: deny-all
namespace: production
spec:
{} # Empty spec = deny all---
Allow specific service-to-service communication
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: allow-order-to-payment
namespace: production
spec:
selector:
matchLabels:
app: payment-service
action: ALLOW
rules:
- from:
- source:
principals:
- "cluster.local/ns/production/sa/order-service"
to:
- operation:
methods: ["POST"]
paths: ["/api/v1/payments/*"]---
Allow API gateway to all services
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: allow-gateway
namespace: production
spec:
action: ALLOW
rules:
- from:
- source:
principals:
- "cluster.local/ns/istio-system/sa/istio-ingressgateway"
to:
- operation:
methods: ["GET", "POST", "PUT", "DELETE"]---
JWT validation at ingress
apiVersion: security.istio.io/v1beta1
kind: RequestAuthentication
metadata:
name: jwt-auth
namespace: production
spec:
selector:
matchLabels:
istio: ingressgateway
jwtRules:
- issuer: "https://auth.example.com"
jwksUri: "https://auth.example.com/.well-known/jwks.json"
audiences:
- "api.example.com"
forwardOriginalToken: true
Secrets Management with HashiCorp Vault
Vault Configuration
import hvac
from functools import lru_cache
from typing import Dict, Any
import asyncioclass VaultClient:
"""HashiCorp Vault client for secrets management."""
def __init__(
self,
url: str,
token: str = None,
role_id: str = None,
secret_id: str = None
):
self.client = hvac.Client(url=url)
if token:
self.client.token = token
elif role_id and secret_id:
self._auth_approle(role_id, secret_id)
def _auth_approle(self, role_id: str, secret_id: str):
"""Authenticate using AppRole."""
response = self.client.auth.approle.login(
role_id=role_id,
secret_id=secret_id
)
self.client.token = response['auth']['client_token']
def get_secret(self, path: str) -> Dict[str, Any]:
"""Get secret from KV v2 engine."""
response = self.client.secrets.kv.v2.read_secret_version(path=path)
return response['data']['data']
def get_database_credentials(self, role: str) -> Dict[str, str]:
"""Get dynamic database credentials."""
response = self.client.secrets.database.generate_credentials(role)
return {
'username': response['data']['username'],
'password': response['data']['password'],
'ttl': response['lease_duration']
}
def encrypt(self, plaintext: str, key_name: str) -> str:
"""Encrypt using Transit engine."""
import base64
encoded = base64.b64encode(plaintext.encode()).decode()
response = self.client.secrets.transit.encrypt_data(
name=key_name,
plaintext=encoded
)
return response['data']['ciphertext']
def decrypt(self, ciphertext: str, key_name: str) -> str:
"""Decrypt using Transit engine."""
import base64
response = self.client.secrets.transit.decrypt_data(
name=key_name,
ciphertext=ciphertext
)
return base64.b64decode(response['data']['plaintext']).decode()
Dynamic secrets with automatic rotation
class SecretManager:
"""Manages secrets with automatic rotation.""" def __init__(self, vault: VaultClient):
self.vault = vault
self._cache = {}
self._lock = asyncio.Lock()
async def get_database_url(self, db_name: str) -> str:
"""Get database URL with dynamic credentials."""
async with self._lock:
cached = self._cache.get(db_name)
# Check if cached credentials are still valid
if cached and cached['expires_at'] > datetime.utcnow():
return cached['url']
# Get new credentials
creds = self.vault.get_database_credentials(f"{db_name}-role")
url = f"postgresql://{creds['username']}:{creds['password']}@db:5432/{db_name}"
# Cache with TTL buffer
self._cache[db_name] = {
'url': url,
'expires_at': datetime.utcnow() + timedelta(seconds=creds['ttl'] - 60)
}
return url
async def rotate_secrets(self):
"""Background task to rotate secrets before expiry."""
while True:
for db_name, cached in list(self._cache.items()):
if cached['expires_at'] < datetime.utcnow() + timedelta(minutes=5):
await self.get_database_url(db_name)
await asyncio.sleep(60)
API Security Best Practices
Input Validation & Sanitization
from pydantic import BaseModel, validator, constr, EmailStr
from typing import Optional
import bleach
import reclass UserCreate(BaseModel):
"""Input validation with security in mind."""
username: constr(min_length=3, max_length=50, regex=r'^[a-zA-Z0-9_-]+$')
email: EmailStr
password: constr(min_length=12, max_length=128)
bio: Optional[str] = None
@validator('password')
def validate_password_strength(cls, v):
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain digit')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', v):
raise ValueError('Password must contain special character')
return v
@validator('bio')
def sanitize_bio(cls, v):
if v:
# Remove potentially dangerous HTML
return bleach.clean(
v,
tags=['b', 'i', 'u', 'p', 'br'],
strip=True
)
return v
SQL injection prevention
from sqlalchemy import textasync def get_user_safe(db, user_id: str):
"""Safe parameterized query."""
# GOOD: Parameterized query
result = await db.execute(
text("SELECT * FROM users WHERE id = :user_id"),
{"user_id": user_id}
)
return result.fetchone()
# BAD: String interpolation (SQL injection vulnerable)
# result = await db.execute(f"SELECT * FROM users WHERE id = '{user_id}'")
XSS prevention in responses
from fastapi.responses import HTMLResponse
from markupsafe import escape@app.get("/profile/{username}")
async def get_profile(username: str):
# Escape user input before rendering
safe_username = escape(username)
return HTMLResponse(f"
Profile: {safe_username}
")
Security Headers Middleware
from starlette.middleware.base import BaseHTTPMiddlewareclass SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all responses."""
async def dispatch(self, request, call_next):
response = await call_next(request)
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# XSS protection
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-XSS-Protection"] = "1; mode=block"
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self' https://api.example.com; "
"frame-ancestors 'none';"
)
# HSTS
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains; preload"
)
# Referrer policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions policy
response.headers["Permissions-Policy"] = (
"geolocation=(), microphone=(), camera=()"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
Audit Logging
import structlog
from datetime import datetime
from typing import Optional, Dict, Any
from fastapi import Requestlogger = structlog.get_logger()
class AuditLogger:
"""Security audit logging for compliance."""
async def log_authentication(
self,
user_id: str,
success: bool,
method: str,
ip_address: str,
user_agent: str,
failure_reason: Optional[str] = None
):
await logger.ainfo(
"authentication_attempt",
event_type="AUTHENTICATION",
user_id=user_id,
success=success,
method=method,
ip_address=ip_address,
user_agent=user_agent,
failure_reason=failure_reason,
timestamp=datetime.utcnow().isoformat()
)
async def log_authorization(
self,
user_id: str,
resource: str,
action: str,
allowed: bool,
policy: str
):
await logger.ainfo(
"authorization_decision",
event_type="AUTHORIZATION",
user_id=user_id,
resource=resource,
action=action,
allowed=allowed,
policy=policy,
timestamp=datetime.utcnow().isoformat()
)
async def log_data_access(
self,
user_id: str,
resource_type: str,
resource_id: str,
action: str,
fields_accessed: list = None
):
await logger.ainfo(
"data_access",
event_type="DATA_ACCESS",
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action=action,
fields_accessed=fields_accessed,
timestamp=datetime.utcnow().isoformat()
)
Audit middleware
class AuditMiddleware(BaseHTTPMiddleware):
def __init__(self, app, audit_logger: AuditLogger):
super().__init__(app)
self.audit = audit_logger async def dispatch(self, request: Request, call_next):
start_time = datetime.utcnow()
response = await call_next(request)
# Log API access
user = getattr(request.state, "user", None)
await self.audit.log_data_access(
user_id=user.sub if user else "anonymous",
resource_type="api",
resource_id=request.url.path,
action=request.method
)
return response
Conclusion
Zero Trust security in microservices requires multiple layers:
1. mTLS for service-to-service encryption and authentication 2. JWT with short-lived tokens for user authentication 3. Service mesh (Istio) for network policies and authorization 4. Vault for secrets management with dynamic credentials 5. Input validation to prevent injection attacks 6. Security headers to protect against common web vulnerabilities 7. Audit logging for compliance and forensics
Remember: Security is not a feature—it's a foundation. Build it in from the start.
Related Articles
- Authentication & Authorization: JWT & OAuth Guide - Auth patterns
- Docker Kubernetes Deployment Guide - Container security
- Building Scalable Microservices - Service architecture
- HIPAA Compliance for Healthcare SaaS - Healthcare security
- Kubernetes Advanced Guide - Service mesh