Async/Await Patterns in Python

· 5 min read · Updated March 13, 2026 · intermediate
asyncio async await patterns concurrency

Once you understand the basics of asyncio—coroutines, the event loop, and async/await—the real challenge begins. Writing production-quality async code means handling timeouts, retries, cancellation, and error propagation gracefully. This guide covers practical patterns you’ll use in real async applications.

Retry with Backoff

Network calls fail. A retry pattern with exponential backoff makes your code resilient:

import asyncio
import random

async def fetch_with_retry(url, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            # Simulate a network request
            if random.random() < 0.7:  # 70% chance of failure
                raise ConnectionError(f"Failed to fetch {url}")
            return f"Data from {url}"
        except ConnectionError as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)  # Exponential backoff
            print(f"Retry {attempt + 1}/{max_retries} after {delay}s")
            await asyncio.sleep(delay)

async def main():
    result = await fetch_with_retry("api.example.com")
    print(result)

asyncio.run(main())

The key insight: asyncio.sleep() lets other coroutines run while waiting, so retry delays don’t block your entire application.

Timeout Patterns

Long-running operations need timeouts. asyncio.wait_for() is the standard approach:

import asyncio

async def slow_operation():
    await asyncio.sleep(10)
    return "finished"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out!")

asyncio.run(main())
# Output: Operation timed out!

For multiple operations with a collective timeout, use asyncio.wait():

import asyncio

async def operation(name, delay):
    await asyncio.sleep(delay)
    return f"{name} done"

async def main():
    tasks = [
        asyncio.create_task(operation("fast", 1)),
        asyncio.create_task(operation("medium", 3)),
        asyncio.create_task(operation("slow", 5)),
    ]
    
    done, pending = await asyncio.wait(
        tasks,
        timeout=2.0,
        return_when=asyncio.ALL_COMPLETED
    )
    
    print(f"Completed: {len(done)}")
    print(f"Pending: {len(pending)}")
    
    # Cancel remaining tasks
    for task in pending:
        task.cancel()
    await asyncio.gather(*pending, return_exceptions=True)

asyncio.run(main())

Batching with Semaphores

When you need to limit concurrent operations—say, to avoid overwhelming an API—use a semaphore:

import asyncio

async def fetch_url(semaphore, url):
    async with semaphore:
        # Only 3 requests run at a time
        await asyncio.sleep(0.5)  # Simulated request
        return f"Data from {url}"

async def main():
    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent
    urls = [f"api.example.com/item/{i}" for i in range(10)]
    
    tasks = [fetch_url(semaphore, url) for url in urls]
    results = await asyncio.gather(*tasks)
    
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())

This pattern is essential for rate-limited APIs or when memory is a concern.

Error Handling Strategies

Async error handling requires thinking about where exceptions propagate:

import asyncio
import random

async def risky_operation():
    await asyncio.sleep(1)
    if random.random() < 0.5:
        raise ValueError("Something went wrong")
    return "success"

async def main():
    # Option 1: Try/except directly
    try:
        result = await risky_operation()
    except ValueError as e:
        print(f"Caught: {e}")

    # Option 2: gather with return_exceptions
    results = await asyncio.gather(
        risky_operation(),
        risky_operation(),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(main())

The return_exceptions=True pattern is useful when you want all tasks to complete regardless of individual failures.

Cancellation Handling

Tasks can be cancelled externally. Handle this gracefully:

import asyncio

async def cancellable_task():
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task cancelled, cleaning up...")
        # Do cleanup here
        raise  # Re-raise if you want cancellation to propagate

async def main():
    task = asyncio.create_task(cancellable_task())
    await asyncio.sleep(3)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Cancellation completed")

asyncio.run(main())
# Output:
# Working...
# Working...
# Working...
# Task cancelled, cleaning up...
# Cancellation completed

Context Variables in Async Code

When you need thread-local-like storage in async code, use contextvars:

import asyncio
from contextvars import ContextVar

request_id: ContextVar[str] = ContextVar("request_id")

async def log_request():
    # Each task gets its own value
    print(f"Processing request: {request_id.get()}")

async def handle_request(req_id):
    token = request_id.set(req_id)
    try:
        await log_request()
    finally:
        request_id.reset(token)

async def main():
    await asyncio.gather(
        handle_request("req-1"),
        handle_request("req-2"),
        handle_request("req-3"),
    )

asyncio.run(main())
# Output:
# Processing request: req-1
# Processing request: req-2
# Processing request: req-3

This is useful for request-scoped data like tracing IDs.

Producer-Consumer with Queues

Async queues enable producer-consumer patterns:

import asyncio

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.5)
        item = f"item-{i}"
        await queue.put(item)
        print(f"Produced: {item}")
    await queue.put(None)  # Signal completion

async def consumer(queue, name):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"{name} consumed: {item}")
        await asyncio.sleep(1)  # Simulate processing
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)
    
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, "consumer-1"),
        consumer(queue, "consumer-2"),
    )

asyncio.run(main())

Multiple consumers process items concurrently while the producer adds them.

Race Condition Prevention

When multiple coroutines access shared state, use locks:

import asyncio

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()
    
    async def increment(self):
        async with self.lock:
            self.value += 1
            return self.value

async def worker(counter, worker_id):
    for _ in range(5):
        value = await counter.increment()
        print(f"Worker {worker_id} incremented to {value}")

async def main():
    counter = Counter()
    await asyncio.gather(
        worker(counter, "A"),
        worker(counter, "B"),
    )
    print(f"Final value: {counter.value}")

asyncio.run(main())
# Final value will be 10 (not less due to locking)

Without the lock, you’d get incorrect results due to race conditions.

Summary

These patterns form the foundation of production async code:

  • Retry with backoff — Resilience against transient failures
  • Timeouts — Prevent operations from hanging indefinitely
  • Semaphores — Rate limiting for concurrent operations
  • Error handling — Choose between exceptions or return_exceptions
  • Cancellation — Clean up resources when tasks are cancelled
  • Context vars — Request-scoped data in async code
  • Queues — Producer-consumer patterns
  • Locks — Protect shared state from race conditions

Combine these patterns to build robust async applications that handle real-world failure modes gracefully.

See Also