Async HTTP with aiohttp
aiohttp is the go-to library for making asynchronous HTTP requests in Python. Where requests blocks the event loop on every response, aiohttp lets you fire off dozens of requests concurrently without breaking a sweat. If you’re already using asyncio, integrating aiohttp is straightforward.
Your First Request: ClientSession
The entry point is aiohttp.ClientSession. Think of it as your connection manager — it holds onto TCP connections, manages cookies, and applies default headers across every request you make.
The context manager pattern is the safest way to use it:
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("https://jsonplaceholder.typicode.com/posts/1") as resp:
print(resp.status) # 200
data = await resp.json()
print(data["title"])
asyncio.run(main())
Inside the async with block the session is open. When the block exits, the session closes and releases all its connections. Never skip the context manager — an unclosed session leaks file descriptors and memory.
One session per application (or per target API) is the right mental model. Reuse it for all requests so connection pooling and keepalives actually work.
HTTP Methods: GET, POST, PUT, DELETE
Every method follows the same pattern: pass the URL, get a response object. The only difference is how the body is handled.
GET with query params:
async with session.get(
"https://api.example.com/posts",
params={"page": 1, "limit": 10, "status": "published"},
) as resp:
posts = await resp.json()
aiohttp appends params to the URL as a query string automatically. No need to build it yourself.
POST with a JSON body:
async with session.post(
"https://api.example.com/posts",
json={"title": "Hello", "body": "World", "userId": 1},
) as resp:
created = await resp.json()
print(f"Created post {created['id']}")
Using json= tells aiohttp to serialize the dict and set Content-Type: application/json in the same step. Don’t pass both data= and json= in the same call — aiohttp will raise an error because you can only configure one payload type per request.
Other methods:
# PUT, PATCH, DELETE follow the same pattern
async with session.put(url, json={"title": "Updated"}) as resp:
...
async with session.patch(url, json={"title": "Patched"}) as resp:
...
async with session.delete(url) as resp:
print(resp.status) # often 204 No Content
Handling the Response
ClientResponse gives you the status, headers, and body. Call one of the reading methods depending on what you need:
async with session.get(url) as resp:
# HTTP status code
print(resp.status)
# Body as decoded text
text = await resp.text()
# Body as parsed JSON
data = await resp.json()
# Raw bytes
raw = await resp.read()
# Final URL after any redirects
print(resp.url)
If you expect a non-JSON response, use resp.text(). For JSON APIs, resp.json() is the cleanest path. Both methods raise aiohttp.ContentTypeError if the body can’t be decoded as expected.
Streaming for large responses:
async with session.get("https://api.example.com/large-file") as resp:
async for chunk in resp.content.iter_any():
process(chunk)
Timeouts
aiohttp’s default timeout is 5 minutes (300 seconds) per request. You almost always want something tighter.
Use aiohttp.ClientTimeout to configure this:
# Session-level timeout applies to all requests
timeout = aiohttp.ClientTimeout(total=30, sock_connect=10, sock_read=20)
async with aiohttp.ClientSession(timeout=timeout) as session:
...
# Per-request override
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
...
total covers the entire operation — connection, request, and response. sock_connect handles how long DNS and TCP handshakes take. sock_read controls the gap between successive reads. Set total=0 for no timeout at all.
Connection Pooling with TCPConnector
Every session has a TCPConnector that manages the underlying connection pool. By default, aiohttp caps the pool at 100 total connections and allows unlimited connections per host.
Tune this when you’re making many concurrent requests:
connector = aiohttp.TCPConnector(
limit=50, # max 50 connections total
limit_per_host=10, # max 10 connections to a single host
force_close=True, # close connections when returned to pool
keepalive_timeout=30,
)
async with aiohttp.ClientSession(connector=connector) as session:
...
Increase limit when you’re hitting many different hosts. Set limit_per_host when a specific API rate-limits you per connection. Use force_close=True in long-running processes to prevent connection buildup.
Error Handling
Non-2xx responses don’t raise by default — you have to opt in. The simplest way is raise_for_status():
async with session.get(url) as resp:
resp.raise_for_status() # raises ClientResponseError for 4xx/5xx
return await resp.json()
Or set it globally on the session:
async with aiohttp.ClientSession(raise_for_status=True) as session:
async with session.get(url) as resp:
# automatically raises on 4xx/5xx
...
Catching specific errors:
try:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientResponseError as e:
# 4xx or 5xx response
print(f"HTTP {e.status}: {e.message}")
except aiohttp.ClientConnectorSSLError as e:
# SSL certificate verification failed
print("SSL error — check your certificates")
except aiohttp.ClientError as e:
# Connection refused, DNS failure, timeout, etc.
print(f"Connection error: {e}")
The exception hierarchy is: ClientError (base) → ClientResponseError, ClientConnectorError, ClientPayloadError, InvalidURL. Catch ClientError to handle everything in one place.
Concurrent Requests
The real power of aiohttp shows up when you run many requests at once. Combine it with asyncio.gather():
import asyncio
URLS = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
]
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.json()
async def main():
connector = aiohttp.TCPConnector(limit=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch(session, url) for url in URLS]
results = await asyncio.gather(*tasks)
for post in results:
print(post["title"])
asyncio.run(main())
If you’re going wide — hundreds of concurrent requests — use a semaphore to bound the concurrency and prevent overwhelming your own machine or the target server:
semaphore = asyncio.Semaphore(5)
async def fetch(session, url):
async with semaphore:
async with session.get(url) as resp:
return await resp.json()
Headers, Cookies, and Base URLs
Set default headers at the session level so you don’t repeat them on every call:
async with aiohttp.ClientSession(
headers={"Authorization": "Bearer my-token", "User-Agent": "my-app/1.0"},
) as session:
# Authorization header is sent with every request
async with session.get("https://api.example.com/profile") as resp:
...
Pass cookies at construction or per-request:
# At construction — sent with every request
async with aiohttp.ClientSession(cookies={"session_id": "abc123"}) as session:
...
# Per request
async with session.get(url, cookies={"tmp_session": "xyz"}) as resp:
...
Use a base_url when all your requests go to the same API:
async with aiohttp.ClientSession(base_url="https://api.example.com") as session:
# Sends to https://api.example.com/posts
async with session.get("/posts") as resp:
...
SSL and Certificates
By default, aiohttp verifies SSL certificates using the system’s CA bundle. In some environments (notably macOS), Python can’t find the system bundle. Use certifi to fix it:
import ssl
import certifi
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
...
To disable verification entirely (not recommended for production):
async with session.get(url, ssl=False) as resp:
...
For client certificates:
ssl_context = ssl.create_default_context(cafile=certifi.where())
ssl_context.load_cert_chain("client.crt", "client.key")
connector = aiohttp.TCPConnector(ssl=ssl_context)
Common Mistakes to Avoid
Creating a new session per request. Every session builds its own connection pool. If you create a session inside a function that gets called repeatedly, you’ll leak connections and exhaust file descriptors fast. Create one session and reuse it:
# BAD
async def fetch(url):
async with aiohttp.ClientSession() as session: # new pool every call
async with session.get(url) as resp:
return await resp.json()
# GOOD
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
Not reading the response body before the context exits. Once the async with resp: block ends, the response is consumed. Check status and read the body inside the same block:
# BAD
async with session.get(url) as resp:
pass
data = await resp.json() # raises RuntimeError: Response is closed
# GOOD
async with session.get(url) as resp:
resp.raise_for_status()
data = await resp.json()
Exceeding the connection pool limit. If you fire 200 requests against a connector with limit=100, the extra requests hang waiting for a connection. Use a semaphore that matches your connector’s limit.
See Also
- asyncio Basics: Event Loops and Coroutines — if you need a refresher on the async fundamentals aiohttp builds on
- Async Patterns: Gather, Wait, and Queues — covers
asyncio.gather()and concurrent task coordination - Working with APIs — general patterns for consuming REST APIs in Python