Async Patterns: Gather, Wait, and Queues
Once you are comfortable with async and await, the next step is learning how to coordinate multiple concurrent tasks. Three tools sit at the center of that work: asyncio.gather(), asyncio.wait(), and asyncio.Queue. Each serves a distinct purpose.
When to Use Each Pattern
asyncio.gather() is the tool to reach for when you want to run several coroutines concurrently and collect all their results at once. asyncio.wait() gives you finer control — you can watch a group of tasks and react as soon as the first one finishes, or when any one raises an error. asyncio.Queue shines in producer-consumer workflows where tasks need to pass data through a shared channel.
Running Tasks Concurrently with gather()
asyncio.gather() accepts any number of awaitables as positional arguments and returns a list of results in the order you passed them. By default, if any coroutine raises an exception, gather() propagates that exception and cancels the other tasks.
import asyncio
async def fetch(url: str) -> str:
await asyncio.sleep(0.1) # simulate network I/O
return f"result from {url}"
async def main():
results = await asyncio.gather(
fetch("a"),
fetch("b"),
fetch("c"),
)
print(results)
# ['result from a', 'result from b', 'result from c']
asyncio.run(main())
Pass return_exceptions=True and gather() collects every outcome — results and exceptions alike — without canceling anything:
import asyncio
async def risky(name: str) -> str:
if name == "B":
raise ValueError(f"{name} failed")
await asyncio.sleep(0.1)
return f"{name} OK"
async def main():
results = await asyncio.gather(
risky("A"),
risky("B"),
risky("C"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"Result: {r}")
# output
# Result: A OK
# Error: B failed
# Result: C OK
asyncio.run(main())
This makes gather() a safe default for fire-and-forget parallel workloads where you want to know about every outcome.
Watching Tasks with wait()
asyncio.wait() takes an iterable of awaitables and returns a tuple of two sets: (done, pending). The return_when parameter controls when wait() resolves.
return_when value | Resolves when |
|---|---|
ALL_COMPLETED (default) | All tasks finish |
FIRST_COMPLETED | Any task finishes first |
FIRST_EXCEPTION | Any task raises (or all complete without error) |
Critically, wait() does not return results. You must call .result() on each Task in the done set to extract the value.
import asyncio
async def task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} finished (delay={delay})"
async def main():
tasks = [
asyncio.create_task(task("fast", 0.1)),
asyncio.create_task(task("medium", 0.5)),
asyncio.create_task(task("slow", 1.0)),
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED,
)
for d in done:
print(d.result())
# Cancel tasks still running
for p in pending:
p.cancel()
await asyncio.gather(*pending, return_exceptions=True)
asyncio.run(main())
# fast finished (delay=0.1)
wait() also accepts a timeout in seconds. When the timeout fires, the done set contains whatever tasks had already finished, and pending still holds the rest. The pending tasks keep running — you must cancel them explicitly if you no longer need their results.
import asyncio
async def long_task():
await asyncio.sleep(10)
return "done"
async def main():
task = asyncio.create_task(long_task())
done, pending = await asyncio.wait(
{task},
timeout=0.5,
)
print(f"Finished in time: {len(done) == 1}") # False — task still running
if pending:
task.cancel()
await asyncio.gather(task, return_exceptions=True)
asyncio.run(main())
Passing Data with asyncio.Queue
asyncio.Queue implements a thread-unsafe async queue — it is not the same as queue.Queue from the stdlib. It exists solely to coordinate coroutines within a single event loop.
Basic Operations
import asyncio
async def main():
q: asyncio.Queue[int] = asyncio.Queue(maxsize=3)
await q.put(1)
await q.put(2)
print(q.qsize()) # 2
item = await q.get()
print(item) # 1
print(q.qsize()) # 1
asyncio.run(main())
put() blocks when the queue reaches maxsize. get() blocks when the queue is empty. Use put_nowait() and get_nowait() to raise immediately instead of blocking.
Producer-Consumer Pattern
The canonical use case is a producer writing items while one or more consumers drain the queue:
import asyncio
async def producer(q: asyncio.Queue[int], count: int) -> None:
for i in range(count):
await q.put(i)
print(f"Produced: {i}")
await q.put(None) # sentinel to signal shutdown
async def consumer(q: asyncio.Queue[int], name: str) -> None:
while True:
item = await q.get()
if item is None:
q.task_done()
break
await asyncio.sleep(0.05) # simulate processing
print(f"{name} consumed: {item}")
q.task_done()
async def main():
q: asyncio.Queue[int | None] = asyncio.Queue(maxsize=5)
consumers = [
asyncio.create_task(consumer(q, f"worker-{i}"))
for i in range(2)
]
await producer(q, 8)
# Wait until all items have been processed
await q.join()
for c in consumers:
c.cancel()
await asyncio.gather(*consumers, return_exceptions=True)
print("Done")
asyncio.run(main())
Shutting Down Gracefully (Python 3.13+)
Python 3.13 added Queue.shutdown(immediate=False) as a cleaner alternative to sentinel values:
import asyncio
async def producer(q: asyncio.Queue[int]) -> None:
try:
for i in range(5):
await q.put(i)
except asyncio.QueueShutDown:
print("Queue closed before we finished")
async def consumer(q: asyncio.Queue[int]) -> None:
try:
while True:
item = await q.get()
print(f"Got: {item}")
q.task_done()
except asyncio.QueueShutDown:
print("Queue shut down — exiting")
async def main():
q: asyncio.Queue[int] = asyncio.Queue()
await asyncio.gather(
asyncio.create_task(producer(q)),
asyncio.create_task(consumer(q)),
)
asyncio.run(main())
On older Python versions, use a sentinel value (None) as shown in the earlier example.
Priority and LIFO Queues
asyncio.PriorityQueue retrieves items as (priority, data) tuples ordered by ascending priority:
import asyncio
async def main():
q: asyncio.PriorityQueue[tuple[int, str]] = asyncio.PriorityQueue()
await q.put((2, "medium"))
await q.put((1, "high"))
await q.put((3, "low"))
item = await q.get()
print(item) # (1, 'high')
item = await q.get()
print(item) # (2, 'medium')
asyncio.run(main())
asyncio.LifoQueue works like a stack — the most recently put item is retrieved first.
Common Pitfalls
gather propagates by default. With return_exceptions=False, the first exception cancels every other task. Always use return_exceptions=True when partial failure is acceptable.
wait does not give you results. Inspect done and call .result() on each Task. Forgetting this is a common mistake.
wait with timeout does not cancel. When the timeout fires, pending tasks keep running. Cancel them explicitly if you need to.
Queue.get() has no timeout. Wrap it with asyncio.wait_for() when you need one:
import asyncio
async def main():
q: asyncio.Queue[str] = asyncio.Queue()
try:
item = await asyncio.wait_for(q.get(), timeout=1.0)
print(item)
except asyncio.TimeoutError:
print("Queue empty — timed out")
asyncio.run(main())
join() requires matching task_done() calls. Every get() must eventually call task_done(). If the counts mismatch, join() raises a ValueError.
asyncio.Queue is not thread-safe. It is designed for coroutine-to-coroutine communication within one event loop. For inter-thread communication, use queue.Queue or asyncio.to_thread().
See Also
- The async/await patterns tutorial covers coroutines and tasks from the ground up.
- The asyncio module reference documents all
asyncioprimitives in detail. - The testing async code tutorial shows how to test
asyncfunctions withpytest.