Back to Blog
Event-Driven Architecture with Apache Kafka: A Practical Guide

Event-Driven Architecture with Apache Kafka: A Practical Guide

December 5, 2024
6 min read
Tushar Agrawal

Learn how to design and implement event-driven systems using Apache Kafka. Covers event schemas, consumer patterns, exactly-once semantics, and real-world patterns from healthcare data pipelines.

Introduction

At Dr. Dangs Lab, we process over 100,000 lab reports daily, each triggering a cascade of events—notifications to patients, updates to medical records, billing operations, and analytics pipelines. Traditional request-response architectures couldn't handle this scale efficiently. Enter Apache Kafka and event-driven architecture.

Why Event-Driven?

Traditional vs Event-Driven

Traditional (Request-Response):

Patient App → API Gateway → Lab Service → Notification Service → Billing Service
                    ↓                              ↓
              [Synchronous Calls - Tightly Coupled]

Event-Driven:

Lab Service → [Kafka: lab-results] → Notification Service
                                   → Billing Service
                                   → Analytics Service
                                   → Medical Records Service

Benefits

  • Loose coupling: Services don't need to know about each other
  • Scalability: Add consumers without modifying producers
  • Resilience: Services can fail independently
  • Replay ability: Events can be replayed for debugging or recovery

Setting Up Kafka

Docker Compose Configuration

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"

Creating Topics

from confluent_kafka.admin import AdminClient, NewTopic

def create_topics():
    admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

    topics = [
        NewTopic('lab-results', num_partitions=6, replication_factor=1),
        NewTopic('notifications', num_partitions=3, replication_factor=1),
        NewTopic('billing-events', num_partitions=3, replication_factor=1),
        NewTopic('dead-letter', num_partitions=1, replication_factor=1),
    ]

    futures = admin.create_topics(topics)
    for topic, future in futures.items():
        try:
            future.result()
            print(f"Created topic: {topic}")
        except Exception as e:
            print(f"Failed to create topic {topic}: {e}")

Event Schema Design

Using Avro for Schema Evolution

from dataclasses import dataclass
from typing import Optional
import json

# Define event schemas
LAB_RESULT_SCHEMA = {
    "type": "record",
    "name": "LabResult",
    "namespace": "com.drdangslab.events",
    "fields": [
        {"name": "event_id", "type": "string"},
        {"name": "event_type", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "version", "type": "int", "default": 1},
        {"name": "patient_id", "type": "string"},
        {"name": "test_id", "type": "string"},
        {"name": "test_name", "type": "string"},
        {"name": "result_value", "type": "string"},
        {"name": "unit", "type": ["null", "string"], "default": None},
        {"name": "reference_range", "type": ["null", "string"], "default": None},
        {"name": "status", "type": {"type": "enum", "name": "Status", "symbols": ["PENDING", "COMPLETED", "VERIFIED"]}},
        {"name": "metadata", "type": {"type": "map", "values": "string"}, "default": {}}
    ]
}


@dataclass
class LabResultEvent:
    event_id: str
    patient_id: str
    test_id: str
    test_name: str
    result_value: str
    status: str
    unit: Optional[str] = None
    reference_range: Optional[str] = None
    metadata: dict = None

    def to_dict(self) -> dict:
        import time
        import uuid
        return {
            "event_id": self.event_id or str(uuid.uuid4()),
            "event_type": "LAB_RESULT_CREATED",
            "timestamp": int(time.time() * 1000),
            "version": 1,
            "patient_id": self.patient_id,
            "test_id": self.test_id,
            "test_name": self.test_name,
            "result_value": self.result_value,
            "unit": self.unit,
            "reference_range": self.reference_range,
            "status": self.status,
            "metadata": self.metadata or {}
        }

Producer Implementation

Async Producer with Retries

from aiokafka import AIOKafkaProducer
import json
import asyncio
from typing import Optional

class EventProducer:
    def __init__(self, bootstrap_servers: str):
        self.bootstrap_servers = bootstrap_servers
        self.producer: Optional[AIOKafkaProducer] = None

    async def start(self):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=self.bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # Wait for all replicas
            retries=3,
            retry_backoff_ms=100,
            enable_idempotence=True  # Exactly-once semantics
        )
        await self.producer.start()

    async def stop(self):
        if self.producer:
            await self.producer.stop()

    async def publish(self, topic: str, event: dict, key: str = None):
        try:
            # Use patient_id as key for ordering guarantees
            await self.producer.send_and_wait(
                topic,
                value=event,
                key=key
            )
            print(f"Published event to {topic}: {event['event_id']}")
        except Exception as e:
            print(f"Failed to publish event: {e}")
            # Send to dead letter queue
            await self.publish_to_dlq(topic, event, str(e))

    async def publish_to_dlq(self, original_topic: str, event: dict, error: str):
        dlq_event = {
            "original_topic": original_topic,
            "original_event": event,
            "error": error,
            "timestamp": int(time.time() * 1000)
        }
        await self.producer.send_and_wait('dead-letter', dlq_event)


# Usage in FastAPI
producer = EventProducer('localhost:9092')

@app.on_event("startup")
async def startup():
    await producer.start()

@app.on_event("shutdown")
async def shutdown():
    await producer.stop()

@app.post("/lab-results")
async def create_lab_result(result: LabResultCreate):
    # Save to database
    saved_result = await save_to_db(result)

    # Publish event
    event = LabResultEvent(
        event_id=str(uuid.uuid4()),
        patient_id=result.patient_id,
        test_id=saved_result.id,
        test_name=result.test_name,
        result_value=result.result_value,
        status="PENDING"
    )
    await producer.publish(
        'lab-results',
        event.to_dict(),
        key=result.patient_id  # Partition by patient
    )

    return saved_result

Consumer Implementation

Resilient Consumer with Error Handling

from aiokafka import AIOKafkaConsumer
import json
import asyncio

class EventConsumer:
    def __init__(
        self,
        topic: str,
        group_id: str,
        bootstrap_servers: str,
        handler
    ):
        self.topic = topic
        self.group_id = group_id
        self.bootstrap_servers = bootstrap_servers
        self.handler = handler
        self.consumer: Optional[AIOKafkaConsumer] = None

    async def start(self):
        self.consumer = AIOKafkaConsumer(
            self.topic,
            bootstrap_servers=self.bootstrap_servers,
            group_id=self.group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest',
            enable_auto_commit=False,  # Manual commit for reliability
            max_poll_records=100
        )
        await self.consumer.start()

    async def consume(self):
        try:
            async for message in self.consumer:
                try:
                    await self.handler(message.value)
                    # Commit only after successful processing
                    await self.consumer.commit()
                except Exception as e:
                    print(f"Error processing message: {e}")
                    # Handle error (retry, DLQ, etc.)
                    await self.handle_error(message, e)
        finally:
            await self.consumer.stop()

    async def handle_error(self, message, error):
        # Implement retry logic or send to DLQ
        pass


# Notification Service Consumer
async def handle_lab_result(event: dict):
    patient_id = event['patient_id']
    test_name = event['test_name']

    # Send notification
    await notification_service.send(
        patient_id=patient_id,
        message=f"Your {test_name} results are ready!"
    )

consumer = EventConsumer(
    topic='lab-results',
    group_id='notification-service',
    bootstrap_servers='localhost:9092',
    handler=handle_lab_result
)

Advanced Patterns

Saga Pattern for Distributed Transactions

class LabResultSaga:
    def __init__(self, producer: EventProducer):
        self.producer = producer

    async def execute(self, lab_result: LabResult):
        saga_id = str(uuid.uuid4())

        try:
            # Step 1: Save result
            await self.save_result(lab_result, saga_id)

            # Step 2: Update patient record
            await self.update_patient_record(lab_result, saga_id)

            # Step 3: Create billing entry
            await self.create_billing(lab_result, saga_id)

            # Publish success event
            await self.producer.publish('saga-completed', {
                'saga_id': saga_id,
                'status': 'COMPLETED'
            })

        except Exception as e:
            # Publish compensation events
            await self.compensate(saga_id, str(e))

    async def compensate(self, saga_id: str, error: str):
        await self.producer.publish('saga-compensation', {
            'saga_id': saga_id,
            'error': error,
            'action': 'ROLLBACK'
        })

Consumer Groups for Scaling

# Multiple instances of the same service share the workload
# Each partition is consumed by exactly one consumer in the group

# Instance 1
consumer1 = EventConsumer(
    topic='lab-results',
    group_id='notification-service',  # Same group
    bootstrap_servers='localhost:9092',
    handler=handle_lab_result
)

# Instance 2 (different machine)
consumer2 = EventConsumer(
    topic='lab-results',
    group_id='notification-service',  # Same group
    bootstrap_servers='localhost:9092',
    handler=handle_lab_result
)

# Partitions are automatically distributed between consumers

Monitoring and Observability

Key Metrics to Track

from prometheus_client import Counter, Histogram, Gauge

# Producer metrics
events_published = Counter(
    'kafka_events_published_total',
    'Total events published',
    ['topic']
)

publish_latency = Histogram(
    'kafka_publish_latency_seconds',
    'Event publish latency',
    ['topic']
)

# Consumer metrics
events_consumed = Counter(
    'kafka_events_consumed_total',
    'Total events consumed',
    ['topic', 'consumer_group']
)

consumer_lag = Gauge(
    'kafka_consumer_lag',
    'Consumer lag',
    ['topic', 'partition', 'consumer_group']
)

Key Takeaways

1. Design events, not APIs: Think about what happened, not what to do 2. Use schemas: Avro or Protobuf for type safety and evolution 3. Partition wisely: Choose partition keys that ensure ordering where needed 4. Handle failures: Implement DLQs and retry mechanisms 5. Monitor everything: Consumer lag is your early warning system

Conclusion

Event-driven architecture with Kafka has transformed how we handle data at scale. The decoupling it provides allows us to add new features without touching existing services, and the replay capability has saved us countless times during incident recovery.

---

Building event-driven systems? Let's connect on LinkedIn to discuss patterns and best practices.

Share this article

Related Articles