Async/Await Patterns in Python
Once you understand the basics of asyncio—coroutines, the event loop, and async/await—the real challenge begins. Writing production-quality async code means handling timeouts, retries, cancellation, and error propagation gracefully. This guide covers practical patterns you’ll use in real async applications.
Retry with Backoff
Network calls fail. A retry pattern with exponential backoff makes your code resilient:
import asyncio
import random
async def fetch_with_retry(url, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
# Simulate a network request
if random.random() < 0.7: # 70% chance of failure
raise ConnectionError(f"Failed to fetch {url}")
return f"Data from {url}"
except ConnectionError as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) # Exponential backoff
print(f"Retry {attempt + 1}/{max_retries} after {delay}s")
await asyncio.sleep(delay)
async def main():
result = await fetch_with_retry("api.example.com")
print(result)
asyncio.run(main())
The key insight: asyncio.sleep() lets other coroutines run while waiting, so retry delays don’t block your entire application.
Timeout Patterns
Long-running operations need timeouts. asyncio.wait_for() is the standard approach:
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "finished"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
# Output: Operation timed out!
For multiple operations with a collective timeout, use asyncio.wait():
import asyncio
async def operation(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
tasks = [
asyncio.create_task(operation("fast", 1)),
asyncio.create_task(operation("medium", 3)),
asyncio.create_task(operation("slow", 5)),
]
done, pending = await asyncio.wait(
tasks,
timeout=2.0,
return_when=asyncio.ALL_COMPLETED
)
print(f"Completed: {len(done)}")
print(f"Pending: {len(pending)}")
# Cancel remaining tasks
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(main())
Batching with Semaphores
When you need to limit concurrent operations—say, to avoid overwhelming an API—use a semaphore:
import asyncio
async def fetch_url(semaphore, url):
async with semaphore:
# Only 3 requests run at a time
await asyncio.sleep(0.5) # Simulated request
return f"Data from {url}"
async def main():
semaphore = asyncio.Semaphore(3) # Max 3 concurrent
urls = [f"api.example.com/item/{i}" for i in range(10)]
tasks = [fetch_url(semaphore, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
This pattern is essential for rate-limited APIs or when memory is a concern.
Error Handling Strategies
Async error handling requires thinking about where exceptions propagate:
import asyncio
import random
async def risky_operation():
await asyncio.sleep(1)
if random.random() < 0.5:
raise ValueError("Something went wrong")
return "success"
async def main():
# Option 1: Try/except directly
try:
result = await risky_operation()
except ValueError as e:
print(f"Caught: {e}")
# Option 2: gather with return_exceptions
results = await asyncio.gather(
risky_operation(),
risky_operation(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(main())
The return_exceptions=True pattern is useful when you want all tasks to complete regardless of individual failures.
Cancellation Handling
Tasks can be cancelled externally. Handle this gracefully:
import asyncio
async def cancellable_task():
try:
while True:
print("Working...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
# Do cleanup here
raise # Re-raise if you want cancellation to propagate
async def main():
task = asyncio.create_task(cancellable_task())
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Cancellation completed")
asyncio.run(main())
# Output:
# Working...
# Working...
# Working...
# Task cancelled, cleaning up...
# Cancellation completed
Context Variables in Async Code
When you need thread-local-like storage in async code, use contextvars:
import asyncio
from contextvars import ContextVar
request_id: ContextVar[str] = ContextVar("request_id")
async def log_request():
# Each task gets its own value
print(f"Processing request: {request_id.get()}")
async def handle_request(req_id):
token = request_id.set(req_id)
try:
await log_request()
finally:
request_id.reset(token)
async def main():
await asyncio.gather(
handle_request("req-1"),
handle_request("req-2"),
handle_request("req-3"),
)
asyncio.run(main())
# Output:
# Processing request: req-1
# Processing request: req-2
# Processing request: req-3
This is useful for request-scoped data like tracing IDs.
Producer-Consumer with Queues
Async queues enable producer-consumer patterns:
import asyncio
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.5)
item = f"item-{i}"
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # Signal completion
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"{name} consumed: {item}")
await asyncio.sleep(1) # Simulate processing
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue, "consumer-1"),
consumer(queue, "consumer-2"),
)
asyncio.run(main())
Multiple consumers process items concurrently while the producer adds them.
Race Condition Prevention
When multiple coroutines access shared state, use locks:
import asyncio
class Counter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
self.value += 1
return self.value
async def worker(counter, worker_id):
for _ in range(5):
value = await counter.increment()
print(f"Worker {worker_id} incremented to {value}")
async def main():
counter = Counter()
await asyncio.gather(
worker(counter, "A"),
worker(counter, "B"),
)
print(f"Final value: {counter.value}")
asyncio.run(main())
# Final value will be 10 (not less due to locking)
Without the lock, you’d get incorrect results due to race conditions.
Summary
These patterns form the foundation of production async code:
- Retry with backoff — Resilience against transient failures
- Timeouts — Prevent operations from hanging indefinitely
- Semaphores — Rate limiting for concurrent operations
- Error handling — Choose between exceptions or
return_exceptions - Cancellation — Clean up resources when tasks are cancelled
- Context vars — Request-scoped data in async code
- Queues — Producer-consumer patterns
- Locks — Protect shared state from race conditions
Combine these patterns to build robust async applications that handle real-world failure modes gracefully.
See Also
asyncio— Full asyncio module reference- Getting Started with asyncio — Foundation concepts for async programming
threading— Thread-based concurrency for comparison with async