asyncio
import asyncio The asyncio module is the foundation for Python’s asynchronous programming model. It provides an event loop, coroutines, tasks, and synchronization primitives for writing concurrent code with async/await syntax. It is designed for I/O-bound workloads where you need to handle many operations concurrently without threads.
Key Functions
asyncio.run()
The main entry point for running async code. It creates a new event loop, runs the given coroutine, and closes the loop when done.
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(main())
# Hello
# (1 second pause)
# World
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
coro | coroutine | — | The coroutine to run |
debug | bool | None | If True, run the event loop in debug mode |
Returns: The result of the coroutine.
This function cannot be called when another event loop is already running in the same thread. It always creates a new event loop and closes it at the end.
asyncio.create_task()
Wraps a coroutine into a Task and schedules it to run on the event loop. The task runs concurrently with other tasks.
import asyncio
async def say_after(delay: float, message: str) -> str:
await asyncio.sleep(delay)
print(message)
return message
async def main():
task1 = asyncio.create_task(say_after(1, "hello"))
task2 = asyncio.create_task(say_after(2, "world"))
# Both tasks run concurrently — total time is ~2s, not ~3s
result1 = await task1
result2 = await task2
print(f"Got: {result1}, {result2}")
asyncio.run(main())
# (after 1s) hello
# (after 2s) world
# Got: hello, world
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
coro | coroutine | — | The coroutine to schedule as a task |
name | str | None | A human-readable name for the task (Python 3.8+) |
Returns: An asyncio.Task object.
asyncio.gather()
Runs multiple coroutines concurrently and waits for all of them to complete. Returns a list of results in the same order as the input coroutines.
import asyncio
async def fetch(url: str) -> str:
print(f"Fetching {url}...")
await asyncio.sleep(1) # simulate network request
return f"Data from {url}"
async def main():
results = await asyncio.gather(
fetch("https://api.example.com/a"),
fetch("https://api.example.com/b"),
fetch("https://api.example.com/c"),
)
for result in results:
print(result)
asyncio.run(main())
# Fetching https://api.example.com/a...
# Fetching https://api.example.com/b...
# Fetching https://api.example.com/c...
# Data from https://api.example.com/a
# Data from https://api.example.com/b
# Data from https://api.example.com/c
All three requests run concurrently, completing in ~1 second instead of ~3.
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
*aws | awaitables | — | One or more coroutines, tasks, or futures to run concurrently |
return_exceptions | bool | False | If True, exceptions are returned as results instead of being raised |
Returns: A list of results in input order.
When return_exceptions=False (default), the first exception raised by any coroutine is propagated. When True, exceptions appear in the results list alongside successful return values:
import asyncio
async def fail():
raise ValueError("something broke")
async def succeed():
return 42
async def main():
results = await asyncio.gather(fail(), succeed(), return_exceptions=True)
print(results)
# [ValueError('something broke'), 42]
asyncio.run(main())
asyncio.sleep()
A non-blocking sleep that suspends the current coroutine for the given number of seconds, allowing other tasks to run.
import asyncio
async def countdown(n: int):
for i in range(n, 0, -1):
print(i)
await asyncio.sleep(1)
print("Go!")
asyncio.run(countdown(3))
# 3
# 2
# 1
# Go!
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
delay | float | — | Number of seconds to sleep |
result | any | None | Value returned when the sleep completes |
Returns: The value of result (default None).
asyncio.wait_for()
Waits for a coroutine to complete with a timeout. Raises asyncio.TimeoutError if the timeout is exceeded.
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
# Operation timed out!
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
aw | awaitable | — | The coroutine or future to wait for |
timeout | float or None | — | Maximum seconds to wait. None means no timeout. |
Returns: The result of the awaitable.
Tasks
Tasks are the primary way to run coroutines concurrently. A task wraps a coroutine and schedules it on the event loop. You can cancel tasks, check their status, and retrieve their results.
import asyncio
async def background_job():
while True:
print("Working...")
await asyncio.sleep(5)
async def main():
task = asyncio.create_task(background_job())
# Let it run for 12 seconds
await asyncio.sleep(12)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancelled")
asyncio.run(main())
# Working...
# Working...
# Working...
# Task cancelled
Cancelling a task raises asyncio.CancelledError inside the coroutine at the next await point. The coroutine can catch this to perform cleanup.
Synchronization Primitives
asyncio.Lock
An async mutex lock. Only one coroutine can hold the lock at a time.
import asyncio
lock = asyncio.Lock()
counter = {"value": 0}
async def increment(n: int):
for _ in range(n):
async with lock:
counter["value"] += 1
async def main():
await asyncio.gather(increment(1000), increment(1000))
print(counter["value"])
# 2000
asyncio.run(main())
asyncio.Semaphore
Limits the number of concurrent coroutines that can enter a section. Useful for rate-limiting or controlling access to a shared resource.
import asyncio
sem = asyncio.Semaphore(3) # max 3 concurrent
async def limited_fetch(url: str):
async with sem:
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Done: {url}"
async def main():
urls = [f"https://api.example.com/{i}" for i in range(10)]
results = await asyncio.gather(*[limited_fetch(u) for u in urls])
print(f"Fetched {len(results)} URLs")
asyncio.run(main())
asyncio.Queue
An async FIFO queue for producer-consumer patterns.
import asyncio
async def producer(queue: asyncio.Queue):
for i in range(5):
await queue.put(i)
print(f"Produced: {i}")
await queue.put(None) # sentinel to signal completion
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=2)
await asyncio.gather(producer(queue), consumer(queue))
asyncio.run(main())
Queue constructor parameter:
| Parameter | Type | Default | Description |
|---|---|---|---|
maxsize | int | 0 | Maximum items in the queue. 0 means unlimited. When full, put() blocks until space is available. |
When to Use asyncio
Good fit:
- Network I/O (HTTP requests, database queries, WebSockets)
- File I/O operations (especially with
aiofiles) - Running many concurrent I/O-bound tasks
- Server applications handling many simultaneous connections
Not a good fit:
- CPU-bound computation (use
multiprocessingorconcurrent.futures.ProcessPoolExecutorinstead) - Simple scripts with no concurrency needs
- Code that calls blocking libraries that don’t have async versions