pyguides

Async Patterns: Gather, Wait, and Queues

Once you are comfortable with async and await, the next step is learning how to coordinate multiple concurrent tasks. Three tools sit at the center of that work: asyncio.gather(), asyncio.wait(), and asyncio.Queue. Each serves a distinct purpose.

When to Use Each Pattern

asyncio.gather() is the tool to reach for when you want to run several coroutines concurrently and collect all their results at once. asyncio.wait() gives you finer control — you can watch a group of tasks and react as soon as the first one finishes, or when any one raises an error. asyncio.Queue shines in producer-consumer workflows where tasks need to pass data through a shared channel.

Running Tasks Concurrently with gather()

asyncio.gather() accepts any number of awaitables as positional arguments and returns a list of results in the order you passed them. By default, if any coroutine raises an exception, gather() propagates that exception and cancels the other tasks.

import asyncio

async def fetch(url: str) -> str:
    await asyncio.sleep(0.1)  # simulate network I/O
    return f"result from {url}"

async def main():
    results = await asyncio.gather(
        fetch("a"),
        fetch("b"),
        fetch("c"),
    )
    print(results)
    # ['result from a', 'result from b', 'result from c']

asyncio.run(main())

Pass return_exceptions=True and gather() collects every outcome — results and exceptions alike — without canceling anything:

import asyncio

async def risky(name: str) -> str:
    if name == "B":
        raise ValueError(f"{name} failed")
    await asyncio.sleep(0.1)
    return f"{name} OK"

async def main():
    results = await asyncio.gather(
        risky("A"),
        risky("B"),
        risky("C"),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"Result: {r}")

# output
# Result: A OK
# Error: B failed
# Result: C OK

asyncio.run(main())

This makes gather() a safe default for fire-and-forget parallel workloads where you want to know about every outcome.

Watching Tasks with wait()

asyncio.wait() takes an iterable of awaitables and returns a tuple of two sets: (done, pending). The return_when parameter controls when wait() resolves.

return_when valueResolves when
ALL_COMPLETED (default)All tasks finish
FIRST_COMPLETEDAny task finishes first
FIRST_EXCEPTIONAny task raises (or all complete without error)

Critically, wait() does not return results. You must call .result() on each Task in the done set to extract the value.

import asyncio

async def task(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} finished (delay={delay})"

async def main():
    tasks = [
        asyncio.create_task(task("fast", 0.1)),
        asyncio.create_task(task("medium", 0.5)),
        asyncio.create_task(task("slow", 1.0)),
    ]

    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED,
    )

    for d in done:
        print(d.result())

    # Cancel tasks still running
    for p in pending:
        p.cancel()
    await asyncio.gather(*pending, return_exceptions=True)

asyncio.run(main())
# fast finished (delay=0.1)

wait() also accepts a timeout in seconds. When the timeout fires, the done set contains whatever tasks had already finished, and pending still holds the rest. The pending tasks keep running — you must cancel them explicitly if you no longer need their results.

import asyncio

async def long_task():
    await asyncio.sleep(10)
    return "done"

async def main():
    task = asyncio.create_task(long_task())

    done, pending = await asyncio.wait(
        {task},
        timeout=0.5,
    )

    print(f"Finished in time: {len(done) == 1}")  # False — task still running
    if pending:
        task.cancel()
        await asyncio.gather(task, return_exceptions=True)

asyncio.run(main())

Passing Data with asyncio.Queue

asyncio.Queue implements a thread-unsafe async queue — it is not the same as queue.Queue from the stdlib. It exists solely to coordinate coroutines within a single event loop.

Basic Operations

import asyncio

async def main():
    q: asyncio.Queue[int] = asyncio.Queue(maxsize=3)

    await q.put(1)
    await q.put(2)
    print(q.qsize())  # 2

    item = await q.get()
    print(item)  # 1
    print(q.qsize())  # 1

asyncio.run(main())

put() blocks when the queue reaches maxsize. get() blocks when the queue is empty. Use put_nowait() and get_nowait() to raise immediately instead of blocking.

Producer-Consumer Pattern

The canonical use case is a producer writing items while one or more consumers drain the queue:

import asyncio

async def producer(q: asyncio.Queue[int], count: int) -> None:
    for i in range(count):
        await q.put(i)
        print(f"Produced: {i}")
    await q.put(None)  # sentinel to signal shutdown

async def consumer(q: asyncio.Queue[int], name: str) -> None:
    while True:
        item = await q.get()
        if item is None:
            q.task_done()
            break
        await asyncio.sleep(0.05)  # simulate processing
        print(f"{name} consumed: {item}")
        q.task_done()

async def main():
    q: asyncio.Queue[int | None] = asyncio.Queue(maxsize=5)

    consumers = [
        asyncio.create_task(consumer(q, f"worker-{i}"))
        for i in range(2)
    ]

    await producer(q, 8)

    # Wait until all items have been processed
    await q.join()

    for c in consumers:
        c.cancel()
    await asyncio.gather(*consumers, return_exceptions=True)

    print("Done")

asyncio.run(main())

Shutting Down Gracefully (Python 3.13+)

Python 3.13 added Queue.shutdown(immediate=False) as a cleaner alternative to sentinel values:

import asyncio

async def producer(q: asyncio.Queue[int]) -> None:
    try:
        for i in range(5):
            await q.put(i)
    except asyncio.QueueShutDown:
        print("Queue closed before we finished")

async def consumer(q: asyncio.Queue[int]) -> None:
    try:
        while True:
            item = await q.get()
            print(f"Got: {item}")
            q.task_done()
    except asyncio.QueueShutDown:
        print("Queue shut down — exiting")

async def main():
    q: asyncio.Queue[int] = asyncio.Queue()
    await asyncio.gather(
        asyncio.create_task(producer(q)),
        asyncio.create_task(consumer(q)),
    )

asyncio.run(main())

On older Python versions, use a sentinel value (None) as shown in the earlier example.

Priority and LIFO Queues

asyncio.PriorityQueue retrieves items as (priority, data) tuples ordered by ascending priority:

import asyncio

async def main():
    q: asyncio.PriorityQueue[tuple[int, str]] = asyncio.PriorityQueue()

    await q.put((2, "medium"))
    await q.put((1, "high"))
    await q.put((3, "low"))

    item = await q.get()
    print(item)  # (1, 'high')
    item = await q.get()
    print(item)  # (2, 'medium')

asyncio.run(main())

asyncio.LifoQueue works like a stack — the most recently put item is retrieved first.

Common Pitfalls

gather propagates by default. With return_exceptions=False, the first exception cancels every other task. Always use return_exceptions=True when partial failure is acceptable.

wait does not give you results. Inspect done and call .result() on each Task. Forgetting this is a common mistake.

wait with timeout does not cancel. When the timeout fires, pending tasks keep running. Cancel them explicitly if you need to.

Queue.get() has no timeout. Wrap it with asyncio.wait_for() when you need one:

import asyncio

async def main():
    q: asyncio.Queue[str] = asyncio.Queue()

    try:
        item = await asyncio.wait_for(q.get(), timeout=1.0)
        print(item)
    except asyncio.TimeoutError:
        print("Queue empty — timed out")

asyncio.run(main())

join() requires matching task_done() calls. Every get() must eventually call task_done(). If the counts mismatch, join() raises a ValueError.

asyncio.Queue is not thread-safe. It is designed for coroutine-to-coroutine communication within one event loop. For inter-thread communication, use queue.Queue or asyncio.to_thread().

See Also