Back to Blog
Python Async Programming with asyncio: Complete Developer Guide

Python Async Programming with asyncio: Complete Developer Guide

December 15, 2024
9 min read
Tushar Agrawal

Master asynchronous programming in Python with asyncio. Learn coroutines, tasks, event loops, async context managers, and build high-performance concurrent applications with practical examples.

Introduction

Asynchronous programming is essential for building high-performance Python applications. At Dr. Dangs Lab, we use asyncio to handle thousands of concurrent API requests, database queries, and external service calls. This guide covers everything you need to master async Python.

Understanding Async/Await

Synchronous vs Asynchronous

import time
import asyncio

# Synchronous - blocks execution
def sync_fetch_data():
    print("Fetching data...")
    time.sleep(2)  # Blocks the entire program
    return "Data received"

# Asynchronous - doesn't block
async def async_fetch_data():
    print("Fetching data...")
    await asyncio.sleep(2)  # Yields control to event loop
    return "Data received"

# Running async code
async def main():
    result = await async_fetch_data()
    print(result)

asyncio.run(main())

How asyncio Works

┌─────────────────────────────────────────────────────┐
│                    Event Loop                        │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐             │
│  │ Task 1  │  │ Task 2  │  │ Task 3  │             │
│  │ (await) │  │ (ready) │  │ (await) │             │
│  └────┬────┘  └────┬────┘  └────┬────┘             │
│       │            │            │                   │
│       ▼            ▼            ▼                   │
│   I/O Wait    Execute     I/O Wait                 │
└─────────────────────────────────────────────────────┘

Key concept: When a coroutine awaits, the event loop
switches to another ready task.

Coroutines, Tasks, and Futures

Coroutines

# A coroutine is defined with async def
async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate I/O
    return {"id": user_id, "name": f"User {user_id}"}

# Calling a coroutine returns a coroutine object
coro = fetch_user(1)  # This doesn't execute yet
print(type(coro))  # <class 'coroutine'>

# Must be awaited to execute
async def main():
    user = await fetch_user(1)  # Now it executes
    print(user)

Tasks - Concurrent Execution

async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    # Create tasks for concurrent execution
    tasks = [asyncio.create_task(fetch_user(uid)) for uid in user_ids]

    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)
    return results

# Sequential: 10 users × 0.1s = 1 second
# Concurrent: 10 users run together ≈ 0.1 second

async def main():
    import time
    start = time.time()

    users = await fetch_all_users(range(10))

    print(f"Fetched {len(users)} users in {time.time() - start:.2f}s")
    # Output: Fetched 10 users in 0.10s

asyncio.run(main())

Task Management

async def task_with_timeout():
    task = asyncio.create_task(slow_operation())

    try:
        # Wait with timeout
        result = await asyncio.wait_for(task, timeout=5.0)
        return result
    except asyncio.TimeoutError:
        task.cancel()  # Cancel the task
        print("Operation timed out")
        return None

async def wait_for_first():
    """Return when first task completes"""
    tasks = [
        asyncio.create_task(fetch_from_cache()),
        asyncio.create_task(fetch_from_db()),
        asyncio.create_task(fetch_from_api()),
    ]

    # Return first completed, cancel others
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel pending tasks
    for task in pending:
        task.cancel()

    # Get result from completed task
    return done.pop().result()

Async Context Managers and Iterators

Async Context Managers

class AsyncDatabaseConnection:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None

    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.1)  # Simulate connection
        self.connection = f"Connection to {self.connection_string}"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection...")
        await asyncio.sleep(0.05)  # Simulate cleanup
        self.connection = None
        return False  # Don't suppress exceptions

    async def query(self, sql: str):
        await asyncio.sleep(0.05)
        return f"Results for: {sql}"

# Usage
async def main():
    async with AsyncDatabaseConnection("postgres://localhost/db") as db:
        results = await db.query("SELECT * FROM users")
        print(results)

Using contextlib for async context managers

from contextlib import asynccontextmanager

@asynccontextmanager
async def managed_resource(name: str):
    print(f"Acquiring {name}")
    await asyncio.sleep(0.1)
    resource = {"name": name, "acquired": True}

    try:
        yield resource
    finally:
        print(f"Releasing {name}")
        await asyncio.sleep(0.05)

async def main():
    async with managed_resource("database") as db:
        print(f"Using {db['name']}")

Async Iterators

class AsyncPaginator:
    def __init__(self, total_items: int, page_size: int = 10):
        self.total_items = total_items
        self.page_size = page_size
        self.current_page = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        start = self.current_page * self.page_size
        if start >= self.total_items:
            raise StopAsyncIteration

        # Simulate async fetch
        await asyncio.sleep(0.1)

        end = min(start + self.page_size, self.total_items)
        items = list(range(start, end))
        self.current_page += 1

        return items

async def main():
    async for page in AsyncPaginator(35, page_size=10):
        print(f"Got page with items: {page}")

# Output:
# Got page with items: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Got page with items: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
# Got page with items: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
# Got page with items: [30, 31, 32, 33, 34]

Async Generators

async def async_range(start: int, stop: int, delay: float = 0.1):
    """Async generator that yields numbers with delay"""
    for i in range(start, stop):
        await asyncio.sleep(delay)
        yield i

async def main():
    async for num in async_range(0, 5):
        print(f"Got: {num}")

# Async generator with database streaming
async def stream_large_query(query: str, batch_size: int = 100):
    """Stream large query results without loading all into memory"""
    offset = 0
    while True:
        batch = await db.execute(
            f"{query} LIMIT {batch_size} OFFSET {offset}"
        )
        if not batch:
            break

        for row in batch:
            yield row

        offset += batch_size

Real-World Patterns

HTTP Client with aiohttp

import aiohttp

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_multiple_urls(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

# With rate limiting
from asyncio import Semaphore

async def fetch_with_rate_limit(
    urls: list[str],
    max_concurrent: int = 10
) -> list[dict]:
    semaphore = Semaphore(max_concurrent)

    async def limited_fetch(session: aiohttp.ClientSession, url: str):
        async with semaphore:
            async with session.get(url) as response:
                return await response.json()

    async with aiohttp.ClientSession() as session:
        tasks = [limited_fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

Database Operations with asyncpg

import asyncpg

class AsyncDatabase:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None

    async def connect(self):
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=5,
            max_size=20
        )

    async def close(self):
        await self.pool.close()

    async def fetch_one(self, query: str, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

    async def fetch_all(self, query: str, *args):
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def execute(self, query: str, *args):
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def execute_many(self, query: str, args_list: list):
        async with self.pool.acquire() as conn:
            await conn.executemany(query, args_list)

# Usage
async def main():
    db = AsyncDatabase("postgresql://user:pass@localhost/db")
    await db.connect()

    try:
        users = await db.fetch_all("SELECT * FROM users WHERE active = $1", True)
        print(f"Found {len(users)} active users")
    finally:
        await db.close()

Producer-Consumer Pattern

import asyncio
from asyncio import Queue

async def producer(queue: Queue, items: list):
    """Produce items and put them in queue"""
    for item in items:
        await asyncio.sleep(0.1)  # Simulate work
        await queue.put(item)
        print(f"Produced: {item}")

    # Signal end of production
    await queue.put(None)

async def consumer(queue: Queue, name: str):
    """Consume items from queue"""
    while True:
        item = await queue.get()
        if item is None:
            queue.put_nowait(None)  # Pass signal to other consumers
            break

        await asyncio.sleep(0.2)  # Simulate processing
        print(f"Consumer {name} processed: {item}")
        queue.task_done()

async def main():
    queue = Queue(maxsize=10)

    # Start producer and multiple consumers
    await asyncio.gather(
        producer(queue, list(range(20))),
        consumer(queue, "A"),
        consumer(queue, "B"),
        consumer(queue, "C"),
    )

Async Retry Pattern

import random
from functools import wraps

def async_retry(
    max_attempts: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: tuple = (Exception,)
):
    """Decorator for retrying async functions with exponential backoff"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        wait_time = delay * (backoff ** attempt)
                        # Add jitter
                        wait_time *= (0.5 + random.random())
                        print(f"Attempt {attempt + 1} failed, retrying in {wait_time:.2f}s")
                        await asyncio.sleep(wait_time)

            raise last_exception

        return wrapper
    return decorator

@async_retry(max_attempts=3, delay=1.0, exceptions=(aiohttp.ClientError,))
async def fetch_with_retry(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            response.raise_for_status()
            return await response.json()

Error Handling

Handling Multiple Task Errors

async def safe_gather(*tasks, return_exceptions: bool = False):
    """Gather with better error handling"""
    results = await asyncio.gather(*tasks, return_exceptions=True)

    if not return_exceptions:
        # Raise first exception found
        for result in results:
            if isinstance(result, Exception):
                raise result

    return results

async def process_with_errors():
    async def task_ok():
        await asyncio.sleep(0.1)
        return "OK"

    async def task_fail():
        await asyncio.sleep(0.05)
        raise ValueError("Something went wrong")

    try:
        results = await safe_gather(
            task_ok(),
            task_fail(),
            task_ok(),
        )
    except ValueError as e:
        print(f"Task failed: {e}")

Graceful Shutdown

import signal

class GracefulShutdown:
    def __init__(self):
        self.shutdown_event = asyncio.Event()
        self.tasks = set()

    def register_task(self, task):
        self.tasks.add(task)
        task.add_done_callback(self.tasks.discard)

    async def shutdown(self):
        self.shutdown_event.set()

        # Cancel all registered tasks
        for task in self.tasks:
            task.cancel()

        # Wait for all tasks to complete
        if self.tasks:
            await asyncio.gather(*self.tasks, return_exceptions=True)

async def main():
    shutdown = GracefulShutdown()

    # Handle SIGTERM/SIGINT
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(
            sig,
            lambda: asyncio.create_task(shutdown.shutdown())
        )

    # Run until shutdown
    while not shutdown.shutdown_event.is_set():
        task = asyncio.create_task(do_work())
        shutdown.register_task(task)
        await asyncio.sleep(1)

Performance Tips

Avoid Blocking Calls

import asyncio
from concurrent.futures import ThreadPoolExecutor

# Bad: Blocking call in async function
async def bad_example():
    result = requests.get("http://example.com")  # BLOCKS!
    return result.json()

# Good: Run blocking code in thread pool
async def good_example():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool,
            requests.get,
            "http://example.com"
        )
    return result.json()

# Best: Use async library
async def best_example():
    async with aiohttp.ClientSession() as session:
        async with session.get("http://example.com") as response:
            return await response.json()

Batching with asyncio.gather

async def process_in_batches(items: list, batch_size: int = 100):
    """Process items in batches to avoid overwhelming resources"""
    results = []

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(
            *[process_item(item) for item in batch]
        )
        results.extend(batch_results)

    return results

Key Takeaways

1. Use async for I/O-bound operations - Network, database, file operations 2. Never block the event loop - Use run_in_executor for CPU-bound work 3. Create tasks for concurrency - asyncio.gather and create_task 4. Implement proper cleanup - Use async context managers 5. Handle timeouts - Use asyncio.wait_for 6. Limit concurrency - Use Semaphore to prevent resource exhaustion 7. Stream large data - Use async generators

Conclusion

Async programming in Python unlocks massive performance improvements for I/O-bound applications. Start with simple coroutines, learn to use tasks for concurrency, and gradually adopt advanced patterns like producer-consumer queues. The investment in learning async pays dividends in application performance.

---

Building async Python applications? Connect on LinkedIn to discuss best practices.

Share this article

Related Articles