Back to Blog
AWS Services for Backend Developers: Complete Infrastructure Guide

AWS Services for Backend Developers: Complete Infrastructure Guide

December 16, 2024
13 min read
Tushar Agrawal

Master essential AWS services for backend development including EC2, Lambda, S3, RDS, DynamoDB, SQS, SNS, API Gateway, and CloudFormation. Learn cloud architecture patterns and best practices.

Introduction

Amazon Web Services (AWS) offers 200+ services, but backend developers typically use a core set of 15-20 services regularly. This guide covers the essential AWS services every backend developer should master, with practical examples and architecture patterns.

Compute Services

EC2 (Elastic Compute Cloud)

Virtual servers for running applications.

# boto3 example: Launch EC2 instance
import boto3

ec2 = boto3.resource('ec2', region_name='us-east-1')

instance = ec2.create_instances(
    ImageId='ami-0c55b159cbfafe1f0',  # Amazon Linux 2
    InstanceType='t3.micro',
    MinCount=1,
    MaxCount=1,
    KeyName='my-key-pair',
    SecurityGroupIds=['sg-12345678'],
    SubnetId='subnet-12345678',
    IamInstanceProfile={
        'Name': 'MyEC2Role'
    },
    TagSpecifications=[{
        'ResourceType': 'instance',
        'Tags': [
            {'Key': 'Name', 'Value': 'web-server-1'},
            {'Key': 'Environment', 'Value': 'production'}
        ]
    }],
    UserData='''#!/bin/bash
    yum update -y
    yum install -y docker
    systemctl start docker
    docker pull myapp:latest
    docker run -d -p 80:8000 myapp:latest
    '''
)[0]

print(f"Instance ID: {instance.id}")

# Wait for instance to be running
instance.wait_until_running()
instance.reload()
print(f"Public IP: {instance.public_ip_address}")

Lambda (Serverless Functions)

Run code without managing servers.

# lambda_function.py
import json
import boto3
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Orders')

def lambda_handler(event, context):
    """
    Process incoming order from API Gateway
    """
    try:
        # Parse request body
        if isinstance(event.get('body'), str):
            body = json.loads(event['body'])
        else:
            body = event.get('body', {})

        # Validate required fields
        required_fields = ['customer_id', 'items', 'total']
        for field in required_fields:
            if field not in body:
                return {
                    'statusCode': 400,
                    'body': json.dumps({'error': f'Missing field: {field}'})
                }

        # Create order
        order_id = f"ORD-{datetime.now().strftime('%Y%m%d%H%M%S')}"

        order = {
            'order_id': order_id,
            'customer_id': body['customer_id'],
            'items': body['items'],
            'total': body['total'],
            'status': 'pending',
            'created_at': datetime.now().isoformat()
        }

        # Save to DynamoDB
        table.put_item(Item=order)

        # Trigger notification
        sns = boto3.client('sns')
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789:OrderNotifications',
            Message=json.dumps({
                'order_id': order_id,
                'customer_id': body['customer_id'],
                'total': body['total']
            }),
            Subject='New Order Created'
        )

        return {
            'statusCode': 201,
            'headers': {
                'Content-Type': 'application/json',
                'Access-Control-Allow-Origin': '*'
            },
            'body': json.dumps({
                'order_id': order_id,
                'status': 'created'
            })
        }

    except Exception as e:
        print(f"Error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

# serverless.yml (Serverless Framework)
service: order-service

provider:
  name: aws
  runtime: python3.11
  region: us-east-1
  environment:
    ORDERS_TABLE: ${self:service}-orders-${sls:stage}
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - dynamodb:PutItem
            - dynamodb:GetItem
            - dynamodb:Query
          Resource: arn:aws:dynamodb:${aws:region}:*:table/${self:provider.environment.ORDERS_TABLE}
        - Effect: Allow
          Action:
            - sns:Publish
          Resource: arn:aws:sns:${aws:region}:*:OrderNotifications

functions:
  createOrder:
    handler: handler.lambda_handler
    events:
      - http:
          path: orders
          method: post
          cors: true
    memorySize: 256
    timeout: 10

resources:
  Resources:
    OrdersTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:provider.environment.ORDERS_TABLE}
        BillingMode: PAY_PER_REQUEST
        AttributeDefinitions:
          - AttributeName: order_id
            AttributeType: S
          - AttributeName: customer_id
            AttributeType: S
        KeySchema:
          - AttributeName: order_id
            KeyType: HASH
        GlobalSecondaryIndexes:
          - IndexName: CustomerIndex
            KeySchema:
              - AttributeName: customer_id
                KeyType: HASH
            Projection:
              ProjectionType: ALL

ECS/Fargate (Container Orchestration)

Run Docker containers at scale.

# task-definition.json
{
  "family": "api-service",
  "networkMode": "awsvpc",
  "requiresCompatibilities": ["FARGATE"],
  "cpu": "512",
  "memory": "1024",
  "executionRoleArn": "arn:aws:iam::123456789:role/ecsTaskExecutionRole",
  "taskRoleArn": "arn:aws:iam::123456789:role/ecsTaskRole",
  "containerDefinitions": [
    {
      "name": "api",
      "image": "123456789.dkr.ecr.us-east-1.amazonaws.com/api:latest",
      "essential": true,
      "portMappings": [
        {
          "containerPort": 8000,
          "protocol": "tcp"
        }
      ],
      "environment": [
        {"name": "NODE_ENV", "value": "production"}
      ],
      "secrets": [
        {
          "name": "DATABASE_URL",
          "valueFrom": "arn:aws:secretsmanager:us-east-1:123456789:secret:db-credentials"
        }
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "/ecs/api-service",
          "awslogs-region": "us-east-1",
          "awslogs-stream-prefix": "api"
        }
      },
      "healthCheck": {
        "command": ["CMD-SHELL", "curl -f http://localhost:8000/health || exit 1"],
        "interval": 30,
        "timeout": 5,
        "retries": 3
      }
    }
  ]
}

Storage Services

S3 (Simple Storage Service)

Object storage for files, backups, static assets.

import boto3
from botocore.exceptions import ClientError
import mimetypes

class S3Storage:
    def __init__(self, bucket_name: str):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name

    def upload_file(self, file_path: str, object_key: str) -> str:
        """Upload file to S3 and return URL"""
        content_type, _ = mimetypes.guess_type(file_path)

        try:
            self.s3.upload_file(
                file_path,
                self.bucket,
                object_key,
                ExtraArgs={
                    'ContentType': content_type or 'application/octet-stream',
                    'ACL': 'private'
                }
            )
            return f"s3://{self.bucket}/{object_key}"
        except ClientError as e:
            raise Exception(f"Upload failed: {e}")

    def generate_presigned_url(self, object_key: str, expiration: int = 3600) -> str:
        """Generate presigned URL for temporary access"""
        try:
            url = self.s3.generate_presigned_url(
                'get_object',
                Params={
                    'Bucket': self.bucket,
                    'Key': object_key
                },
                ExpiresIn=expiration
            )
            return url
        except ClientError as e:
            raise Exception(f"Failed to generate URL: {e}")

    def generate_presigned_post(self, object_key: str, max_size_mb: int = 10) -> dict:
        """Generate presigned POST for direct browser uploads"""
        try:
            response = self.s3.generate_presigned_post(
                self.bucket,
                object_key,
                Fields=None,
                Conditions=[
                    ["content-length-range", 0, max_size_mb * 1024 * 1024]
                ],
                ExpiresIn=3600
            )
            return response
        except ClientError as e:
            raise Exception(f"Failed to generate presigned POST: {e}")

    def list_objects(self, prefix: str = "") -> list:
        """List objects with optional prefix filter"""
        try:
            response = self.s3.list_objects_v2(
                Bucket=self.bucket,
                Prefix=prefix
            )
            return [obj['Key'] for obj in response.get('Contents', [])]
        except ClientError as e:
            raise Exception(f"Failed to list objects: {e}")

    def delete_object(self, object_key: str) -> None:
        """Delete object from S3"""
        try:
            self.s3.delete_object(Bucket=self.bucket, Key=object_key)
        except ClientError as e:
            raise Exception(f"Delete failed: {e}")


# Usage
storage = S3Storage('my-app-bucket')

# Upload file
storage.upload_file('/tmp/report.pdf', 'reports/2024/report.pdf')

# Generate download link
url = storage.generate_presigned_url('reports/2024/report.pdf', expiration=86400)

# Direct browser upload
post_data = storage.generate_presigned_post('uploads/user-123/avatar.jpg')
# Return post_data to frontend for direct upload

Database Services

RDS (Relational Database Service)

Managed PostgreSQL, MySQL, and more.

import boto3
import psycopg2
from contextlib import contextmanager

# Create RDS instance programmatically
def create_rds_instance():
    rds = boto3.client('rds')

    response = rds.create_db_instance(
        DBInstanceIdentifier='myapp-db',
        DBInstanceClass='db.t3.medium',
        Engine='postgres',
        EngineVersion='15.4',
        MasterUsername='admin',
        MasterUserPassword='securepassword123',  # Use Secrets Manager!
        AllocatedStorage=100,
        StorageType='gp3',
        StorageEncrypted=True,
        MultiAZ=True,  # High availability
        PubliclyAccessible=False,
        VpcSecurityGroupIds=['sg-12345678'],
        DBSubnetGroupName='my-subnet-group',
        BackupRetentionPeriod=7,
        EnablePerformanceInsights=True,
        Tags=[
            {'Key': 'Environment', 'Value': 'production'},
            {'Key': 'Application', 'Value': 'myapp'}
        ]
    )

    return response['DBInstance']['DBInstanceIdentifier']


# Connect using IAM authentication
def get_rds_connection():
    """Get database connection using IAM auth"""
    rds_client = boto3.client('rds')

    # Generate auth token
    token = rds_client.generate_db_auth_token(
        DBHostname='myapp-db.cluster-xyz.us-east-1.rds.amazonaws.com',
        Port=5432,
        DBUsername='app_user',
        Region='us-east-1'
    )

    conn = psycopg2.connect(
        host='myapp-db.cluster-xyz.us-east-1.rds.amazonaws.com',
        port=5432,
        database='myapp',
        user='app_user',
        password=token,
        sslmode='require'
    )

    return conn


@contextmanager
def db_connection():
    """Context manager for database connections"""
    conn = get_rds_connection()
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()


# Usage
with db_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users WHERE active = %s", (True,))
        users = cur.fetchall()

DynamoDB (NoSQL Database)

Serverless, scalable NoSQL database.

import boto3
from boto3.dynamodb.conditions import Key, Attr
from decimal import Decimal
import json

class DynamoDBRepository:
    def __init__(self, table_name: str):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)

    def create(self, item: dict) -> dict:
        """Create new item"""
        # Convert floats to Decimal for DynamoDB
        item = json.loads(json.dumps(item), parse_float=Decimal)
        self.table.put_item(Item=item)
        return item

    def get_by_id(self, pk: str, sk: str = None) -> dict:
        """Get item by primary key"""
        key = {'pk': pk}
        if sk:
            key['sk'] = sk

        response = self.table.get_item(Key=key)
        return response.get('Item')

    def query_by_partition(
        self,
        pk: str,
        sk_prefix: str = None,
        limit: int = 100
    ) -> list:
        """Query items by partition key with optional sort key prefix"""
        key_condition = Key('pk').eq(pk)

        if sk_prefix:
            key_condition = key_condition & Key('sk').begins_with(sk_prefix)

        response = self.table.query(
            KeyConditionExpression=key_condition,
            Limit=limit
        )

        return response.get('Items', [])

    def query_gsi(
        self,
        index_name: str,
        pk_value: str,
        sk_value: str = None
    ) -> list:
        """Query Global Secondary Index"""
        key_condition = Key('gsi_pk').eq(pk_value)

        if sk_value:
            key_condition = key_condition & Key('gsi_sk').eq(sk_value)

        response = self.table.query(
            IndexName=index_name,
            KeyConditionExpression=key_condition
        )

        return response.get('Items', [])

    def update(self, pk: str, sk: str, updates: dict) -> dict:
        """Update item attributes"""
        update_expression = "SET "
        expression_values = {}
        expression_names = {}

        for i, (key, value) in enumerate(updates.items()):
            update_expression += f"#attr{i} = :val{i}, "
            expression_names[f"#attr{i}"] = key
            expression_values[f":val{i}"] = value

        update_expression = update_expression.rstrip(", ")

        response = self.table.update_item(
            Key={'pk': pk, 'sk': sk},
            UpdateExpression=update_expression,
            ExpressionAttributeNames=expression_names,
            ExpressionAttributeValues=expression_values,
            ReturnValues='ALL_NEW'
        )

        return response.get('Attributes')

    def delete(self, pk: str, sk: str) -> None:
        """Delete item"""
        self.table.delete_item(Key={'pk': pk, 'sk': sk})

    def batch_write(self, items: list) -> None:
        """Batch write up to 25 items"""
        with self.table.batch_writer() as batch:
            for item in items:
                batch.put_item(Item=item)

    def transact_write(self, operations: list) -> None:
        """Transactional write for multiple items"""
        client = boto3.client('dynamodb')
        client.transact_write_items(TransactItems=operations)


# Single Table Design Example
class OrderRepository(DynamoDBRepository):
    """
    Single table design for Orders
    PK: USER#<user_id>
    SK: ORDER#<order_id> or ITEM#<item_id>
    """

    def create_order(self, user_id: str, order_id: str, items: list) -> None:
        """Create order with items in transaction"""
        operations = []

        # Order record
        operations.append({
            'Put': {
                'TableName': self.table.name,
                'Item': {
                    'pk': {'S': f'USER#{user_id}'},
                    'sk': {'S': f'ORDER#{order_id}'},
                    'status': {'S': 'pending'},
                    'total': {'N': str(sum(i['price'] * i['quantity'] for i in items))},
                    'gsi_pk': {'S': f'ORDER#{order_id}'},
                    'gsi_sk': {'S': f'USER#{user_id}'}
                }
            }
        })

        # Order items
        for item in items:
            operations.append({
                'Put': {
                    'TableName': self.table.name,
                    'Item': {
                        'pk': {'S': f'ORDER#{order_id}'},
                        'sk': {'S': f'ITEM#{item["product_id"]}'},
                        'quantity': {'N': str(item['quantity'])},
                        'price': {'N': str(item['price'])}
                    }
                }
            })

        self.transact_write(operations)

    def get_user_orders(self, user_id: str) -> list:
        """Get all orders for user"""
        return self.query_by_partition(
            pk=f'USER#{user_id}',
            sk_prefix='ORDER#'
        )

    def get_order_items(self, order_id: str) -> list:
        """Get all items for order"""
        return self.query_by_partition(
            pk=f'ORDER#{order_id}',
            sk_prefix='ITEM#'
        )

Messaging Services

SQS (Simple Queue Service)

Message queuing for async processing.

import boto3
import json
from dataclasses import dataclass
from typing import Callable, Any

@dataclass
class Message:
    id: str
    body: dict
    receipt_handle: str
    attributes: dict

class SQSQueue:
    def __init__(self, queue_url: str):
        self.sqs = boto3.client('sqs')
        self.queue_url = queue_url

    def send_message(self, body: dict, delay_seconds: int = 0) -> str:
        """Send message to queue"""
        response = self.sqs.send_message(
            QueueUrl=self.queue_url,
            MessageBody=json.dumps(body),
            DelaySeconds=delay_seconds
        )
        return response['MessageId']

    def send_batch(self, messages: list[dict]) -> dict:
        """Send batch of messages (max 10)"""
        entries = [
            {
                'Id': str(i),
                'MessageBody': json.dumps(msg)
            }
            for i, msg in enumerate(messages)
        ]

        response = self.sqs.send_message_batch(
            QueueUrl=self.queue_url,
            Entries=entries
        )

        return {
            'successful': len(response.get('Successful', [])),
            'failed': len(response.get('Failed', []))
        }

    def receive_messages(
        self,
        max_messages: int = 10,
        wait_time: int = 20,
        visibility_timeout: int = 30
    ) -> list[Message]:
        """Receive messages from queue"""
        response = self.sqs.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=max_messages,
            WaitTimeSeconds=wait_time,
            VisibilityTimeout=visibility_timeout,
            AttributeNames=['All']
        )

        messages = []
        for msg in response.get('Messages', []):
            messages.append(Message(
                id=msg['MessageId'],
                body=json.loads(msg['Body']),
                receipt_handle=msg['ReceiptHandle'],
                attributes=msg.get('Attributes', {})
            ))

        return messages

    def delete_message(self, receipt_handle: str) -> None:
        """Delete processed message"""
        self.sqs.delete_message(
            QueueUrl=self.queue_url,
            ReceiptHandle=receipt_handle
        )

    def delete_batch(self, receipt_handles: list[str]) -> None:
        """Delete batch of messages"""
        entries = [
            {'Id': str(i), 'ReceiptHandle': handle}
            for i, handle in enumerate(receipt_handles)
        ]

        self.sqs.delete_message_batch(
            QueueUrl=self.queue_url,
            Entries=entries
        )


class SQSWorker:
    """Worker to process SQS messages"""

    def __init__(self, queue: SQSQueue, handler: Callable[[dict], Any]):
        self.queue = queue
        self.handler = handler
        self.running = False

    def start(self):
        """Start processing messages"""
        self.running = True
        print("Worker started, listening for messages...")

        while self.running:
            messages = self.queue.receive_messages()

            for message in messages:
                try:
                    print(f"Processing message: {message.id}")
                    self.handler(message.body)
                    self.queue.delete_message(message.receipt_handle)
                    print(f"Message {message.id} processed successfully")
                except Exception as e:
                    print(f"Error processing message {message.id}: {e}")
                    # Message will become visible again after timeout

    def stop(self):
        """Stop worker gracefully"""
        self.running = False


# Usage
def process_order(order_data: dict):
    """Handler for order processing"""
    print(f"Processing order: {order_data['order_id']}")
    # Process the order...

queue = SQSQueue('https://sqs.us-east-1.amazonaws.com/123456789/orders')
worker = SQSWorker(queue, process_order)
worker.start()

SNS (Simple Notification Service)

Pub/Sub messaging for notifications.

import boto3
import json

class SNSPublisher:
    def __init__(self, topic_arn: str):
        self.sns = boto3.client('sns')
        self.topic_arn = topic_arn

    def publish(
        self,
        message: dict,
        subject: str = None,
        attributes: dict = None
    ) -> str:
        """Publish message to topic"""
        params = {
            'TopicArn': self.topic_arn,
            'Message': json.dumps(message)
        }

        if subject:
            params['Subject'] = subject

        if attributes:
            params['MessageAttributes'] = {
                key: {
                    'DataType': 'String',
                    'StringValue': str(value)
                }
                for key, value in attributes.items()
            }

        response = self.sns.publish(**params)
        return response['MessageId']

    def publish_with_filter(
        self,
        message: dict,
        event_type: str
    ) -> str:
        """Publish with filter attributes for subscriber filtering"""
        return self.publish(
            message=message,
            attributes={'event_type': event_type}
        )


# Event-driven architecture example
class EventPublisher:
    """Publish domain events to SNS"""

    def __init__(self):
        self.sns = boto3.client('sns')
        self.topics = {
            'orders': 'arn:aws:sns:us-east-1:123456789:order-events',
            'users': 'arn:aws:sns:us-east-1:123456789:user-events',
            'payments': 'arn:aws:sns:us-east-1:123456789:payment-events'
        }

    def publish_event(
        self,
        domain: str,
        event_type: str,
        payload: dict
    ) -> str:
        """Publish domain event"""
        topic_arn = self.topics.get(domain)
        if not topic_arn:
            raise ValueError(f"Unknown domain: {domain}")

        message = {
            'event_type': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }

        response = self.sns.publish(
            TopicArn=topic_arn,
            Message=json.dumps(message),
            MessageAttributes={
                'event_type': {
                    'DataType': 'String',
                    'StringValue': event_type
                }
            }
        )

        return response['MessageId']


# Usage
events = EventPublisher()
events.publish_event(
    domain='orders',
    event_type='order.created',
    payload={
        'order_id': 'ORD-123',
        'customer_id': 'CUST-456',
        'total': 99.99
    }
)

API Gateway

Create REST and WebSocket APIs.

# SAM template (template.yaml)
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Globals:
  Function:
    Timeout: 30
    Runtime: python3.11
    MemorySize: 256
    Environment:
      Variables:
        TABLE_NAME: !Ref OrdersTable

Resources:
  # REST API
  OrdersApi:
    Type: AWS::Serverless::Api
    Properties:
      StageName: prod
      Cors:
        AllowMethods: "'GET,POST,PUT,DELETE'"
        AllowHeaders: "'Content-Type,Authorization'"
        AllowOrigin: "'*'"
      Auth:
        DefaultAuthorizer: CognitoAuthorizer
        Authorizers:
          CognitoAuthorizer:
            UserPoolArn: !GetAtt UserPool.Arn

  # Lambda Functions
  CreateOrderFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: handlers/orders.create
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref OrdersApi
            Path: /orders
            Method: POST
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref OrdersTable

  GetOrderFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: handlers/orders.get
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref OrdersApi
            Path: /orders/{orderId}
            Method: GET
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref OrdersTable

  ListOrdersFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: handlers/orders.list
      Events:
        Api:
          Type: Api
          Properties:
            RestApiId: !Ref OrdersApi
            Path: /orders
            Method: GET
      Policies:
        - DynamoDBReadPolicy:
            TableName: !Ref OrdersTable

  # DynamoDB Table
  OrdersTable:
    Type: AWS::DynamoDB::Table
    Properties:
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: pk
          AttributeType: S
        - AttributeName: sk
          AttributeType: S
      KeySchema:
        - AttributeName: pk
          KeyType: HASH
        - AttributeName: sk
          KeyType: RANGE

  # Cognito User Pool
  UserPool:
    Type: AWS::Cognito::UserPool
    Properties:
      UserPoolName: orders-user-pool
      AutoVerifiedAttributes:
        - email
      Policies:
        PasswordPolicy:
          MinimumLength: 8
          RequireLowercase: true
          RequireNumbers: true
          RequireSymbols: true
          RequireUppercase: true

Outputs:
  ApiEndpoint:
    Value: !Sub "https://${OrdersApi}.execute-api.${AWS::Region}.amazonaws.com/prod"

Infrastructure as Code

CloudFormation/CDK

# CDK Example (Python)
from aws_cdk import (
    Stack,
    aws_ec2 as ec2,
    aws_rds as rds,
    aws_elasticache as elasticache,
    aws_lambda as _lambda,
    aws_apigateway as apigw,
    Duration,
    RemovalPolicy,
)
from constructs import Construct

class BackendStack(Stack):
    def __init__(self, scope: Construct, id: str, **kwargs):
        super().__init__(scope, id, **kwargs)

        # VPC
        vpc = ec2.Vpc(
            self, "BackendVPC",
            max_azs=2,
            nat_gateways=1,
            subnet_configuration=[
                ec2.SubnetConfiguration(
                    name="Public",
                    subnet_type=ec2.SubnetType.PUBLIC,
                    cidr_mask=24
                ),
                ec2.SubnetConfiguration(
                    name="Private",
                    subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS,
                    cidr_mask=24
                ),
                ec2.SubnetConfiguration(
                    name="Isolated",
                    subnet_type=ec2.SubnetType.PRIVATE_ISOLATED,
                    cidr_mask=24
                )
            ]
        )

        # RDS PostgreSQL
        db_security_group = ec2.SecurityGroup(
            self, "DBSecurityGroup",
            vpc=vpc,
            description="Security group for RDS"
        )

        database = rds.DatabaseInstance(
            self, "Database",
            engine=rds.DatabaseInstanceEngine.postgres(
                version=rds.PostgresEngineVersion.VER_15_4
            ),
            instance_type=ec2.InstanceType.of(
                ec2.InstanceClass.T3, ec2.InstanceSize.MEDIUM
            ),
            vpc=vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_ISOLATED
            ),
            security_groups=[db_security_group],
            multi_az=True,
            storage_encrypted=True,
            deletion_protection=True,
            backup_retention=Duration.days(7)
        )

        # ElastiCache Redis
        cache_security_group = ec2.SecurityGroup(
            self, "CacheSecurityGroup",
            vpc=vpc,
            description="Security group for ElastiCache"
        )

        cache_subnet_group = elasticache.CfnSubnetGroup(
            self, "CacheSubnetGroup",
            description="Subnet group for Redis",
            subnet_ids=[
                subnet.subnet_id
                for subnet in vpc.isolated_subnets
            ]
        )

        redis = elasticache.CfnCacheCluster(
            self, "RedisCluster",
            cache_node_type="cache.t3.micro",
            engine="redis",
            num_cache_nodes=1,
            cache_subnet_group_name=cache_subnet_group.ref,
            vpc_security_group_ids=[cache_security_group.security_group_id]
        )

        # Lambda Function
        api_handler = _lambda.Function(
            self, "ApiHandler",
            runtime=_lambda.Runtime.PYTHON_3_11,
            handler="handler.main",
            code=_lambda.Code.from_asset("lambda"),
            vpc=vpc,
            vpc_subnets=ec2.SubnetSelection(
                subnet_type=ec2.SubnetType.PRIVATE_WITH_EGRESS
            ),
            environment={
                "DB_HOST": database.db_instance_endpoint_address,
                "REDIS_HOST": redis.attr_redis_endpoint_address
            },
            timeout=Duration.seconds(30)
        )

        # Allow Lambda to connect to RDS
        db_security_group.add_ingress_rule(
            peer=api_handler.connections.security_groups[0],
            connection=ec2.Port.tcp(5432)
        )

        # API Gateway
        api = apigw.RestApi(
            self, "BackendApi",
            rest_api_name="Backend API"
        )

        api.root.add_method(
            "GET",
            apigw.LambdaIntegration(api_handler)
        )

Best Practices

1. Security

# Use Secrets Manager for sensitive data
import boto3

def get_secret(secret_name: str) -> dict:
    client = boto3.client('secretsmanager')
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response['SecretString'])

# Never hardcode credentials
db_creds = get_secret('prod/db/credentials')
connection = psycopg2.connect(
    host=db_creds['host'],
    user=db_creds['username'],
    password=db_creds['password']
)

2. Cost Optimization

# Use spot instances for non-critical workloads
Resources:
  SpotFleet:
    Type: AWS::EC2::SpotFleet
    Properties:
      SpotFleetRequestConfigData:
        AllocationStrategy: lowestPrice
        TargetCapacity: 10
        LaunchSpecifications:
          - InstanceType: t3.medium
            SpotPrice: "0.02"

3. Monitoring

# CloudWatch custom metrics
import boto3

cloudwatch = boto3.client('cloudwatch')

def put_metric(name: str, value: float, unit: str = 'Count'):
    cloudwatch.put_metric_data(
        Namespace='MyApp',
        MetricData=[{
            'MetricName': name,
            'Value': value,
            'Unit': unit,
            'Dimensions': [
                {'Name': 'Environment', 'Value': 'production'}
            ]
        }]
    )

# Track business metrics
put_metric('OrdersCreated', 1)
put_metric('OrderValue', 99.99, 'None')
put_metric('ProcessingTime', 250, 'Milliseconds')

Conclusion

Mastering these AWS services enables you to build scalable, reliable backend systems. Start with the basics (EC2, S3, RDS), then progress to serverless (Lambda, DynamoDB) and event-driven architectures (SQS, SNS).

---

Building on AWS? Connect on LinkedIn to discuss cloud architecture and best practices.

Related Articles

Share this article

Related Articles