pyguides

asyncio Basics: Event Loops and Coroutines

This asyncio basics guide covers the two core concepts you need: the event loop and coroutines. asyncio is Python’s built-in library for writing concurrent code using the async/await syntax. It lets you handle thousands of IO operations — network requests, file reads, database queries — without threading or complex lock management.

By the end, you’ll understand how to define, run, and compose async functions.

What Is an Event Loop?

The event loop is the engine that drives async Python. It’s a single-threaded scheduler that runs your code, pauses it at await points, and switches to other ready tasks.

Think of it like a while loop:

# Simplified mental model
while tasks_not_done:
    # Find the next task that's ready to run
    task = get_next_ready_task()
    # Run it until it hits an await
    result = task.run_until_await()
    # Store the result and move on
    store_result(result)

The key insight: when you await something slow (like asyncio.sleep() or an HTTP request), the event loop doesn’t sit idle. It picks up other ready work and comes back to your task when the result is available.

Here’s a concrete example. Two tasks start at the same time, but one finishes faster:

import asyncio

async def task_a():
    print("Task A: starting")
    await asyncio.sleep(2)
    print("Task A: done")

async def task_b():
    print("Task B: starting")
    await asyncio.sleep(1)
    print("Task B: done")

async def main():
    await asyncio.gather(task_a(), task_b())

asyncio.run(main())

# Output:
# Task A: starting
# Task B: starting
# Task B: done     <-- B finishes first (1 second)
# Task A: done    <-- A finishes second (2 seconds)

Both tasks run concurrently. The event loop switches between them whenever one hits an await. Task B completes in 1 second, while task A takes 2 seconds — they didn’t run sequentially (which would take 3 seconds).

Defining Coroutines

A coroutine is a function declared with async def. It can pause execution at await points and resume later.

async def fetch_data(url):
    # This function is a coroutine
    response = await some_async_http_get(url)
    return response

Calling a coroutine doesn’t run it. It returns a coroutine object — a ticket you hand to the event loop:

async def greet():
    return "Hello!"

# Calling without await returns a coroutine object
coro = greet()
print(type(coro))  # <class 'coroutine'>

The coroutine actually runs when you either:

  1. Pass it to asyncio.run()
  2. await it inside another coroutine
  3. Wrap it in a task with asyncio.create_task()

Running Coroutines with asyncio.run()

asyncio.run() is the entry point for most async programs. It creates an event loop, runs your coroutine, and closes the loop when done:

import asyncio

async def greet(name):
    await asyncio.sleep(0.5)
    return f"Hello, {name}"

async def main():
    result = await greet("world")
    print(result)

asyncio.run(main())
# Output: Hello, world

Use asyncio.run() only at the top level of your program. Nested calls raise RuntimeError:

async def nested():
    # This raises RuntimeError — event loop already running
    asyncio.run(other_coro())

async def main():
    await nested()

For nested calls (like calling async code from a synchronous function), use asyncio.get_running_loop() or loop.run_until_complete() instead.

Creating Tasks for Concurrent Execution

asyncio.create_task() wraps a coroutine in a Task object and schedules it to run on the event loop:

import asyncio

async def fetch_user(user_id):
    await asyncio.sleep(0.5)
    return f"user_{user_id}"

async def main():
    # Schedule both tasks immediately — they run concurrently
    task1 = asyncio.create_task(fetch_user(1))
    task2 = asyncio.create_task(fetch_user(2))

    # Both run in parallel; gather waits for both
    results = await asyncio.gather(task1, task2)
    print(results)  # ['user_1', 'user_2']

asyncio.run(main())

create_task() returns immediately — it doesn’t wait for the coroutine to finish. The task runs in the background while you continue executing. When you await the task, you get its result.

Tasks are reference-counted. If you lose all references to a task before it completes, Python’s garbage collector can clean it up mid-execution. Always keep task references until you’re done with them:

# Safe — references kept
tasks = [asyncio.create_task(coro()) for coro in coros]
await asyncio.gather(*tasks)

# Dangerous — tasks can disappear mid-run
[asyncio.create_task(coro()) for coro in coros]
await asyncio.sleep(1)  # Some tasks may have been GC'd

Gathering Results from Multiple Coroutines

asyncio.gather() runs multiple awaitables concurrently and returns results in order:

import asyncio

async def get_stock(symbol):
    await asyncio.sleep(0.3)
    return f"{symbol}: $150"

async def main():
    results = await asyncio.gather(
        get_stock("AAPL"),
        get_stock("GOOG"),
        get_stock("MSFT"),
    )
    for r in results:
        print(r)

asyncio.run(main())
# Output:
# AAPL: $150
# GOOG: $150
# MSFT: $150

If one coroutine raises an exception, gather propagates it and cancels the others by default. Use return_exceptions=True to collect exceptions as results instead:

async def might_fail():
    raise ValueError("something went wrong")

async def main():
    # Exceptions come back as results, not raised
    results = await asyncio.gather(
        might_fail(),
        asyncio.sleep(1),
        return_exceptions=True,
    )
    print(results)
    # Output: [ValueError('something went wrong'), None, None]

asyncio.run(main())

Non-Blocking Sleep

asyncio.sleep() suspends the current coroutine without blocking the event loop:

async def show_progress():
    print("Step 1...")
    await asyncio.sleep(1)
    print("Step 2...")
    await asyncio.sleep(1)
    print("Done!")

async def main():
    # This takes 2 seconds total, not blocking other tasks
    await show_progress()

asyncio.run(main())

The critical mistake: using time.sleep() inside an async function. That blocks the entire event loop:

import time  # Wrong import for async code!

async def broken():
    time.sleep(1)  # Blocks everything — no other task runs
    await asyncio.sleep(1)  # This never happens on time

# Always use asyncio.sleep() inside async functions
async def correct():
    await asyncio.sleep(1)  # Yields control; other tasks run

Running Blocking Code Without Blocking the Event Loop

Some code is inherently blocking — legacy libraries, CPU-heavy work, or synchronous operations. asyncio.to_thread() offloads it to a thread pool so the event loop stays free:

import asyncio
import time

def blocking_db_query():
    # Simulates a slow database call
    time.sleep(2)
    return "query results"

async def main():
    print("Starting async work...")

    # Run blocking code in a thread — event loop stays responsive
    result = await asyncio.to_thread(blocking_db_query)
    print(f"Got: {result}")

asyncio.run(main())

The event loop can handle other tasks while the thread runs the blocking operation. For Python 3.7-3.8 (before to_thread existed), use loop.run_in_executor(None, func, *args) instead.

Checking the Current Task and Loop

Sometimes you need information about the running context:

import asyncio

async def inspect_context():
    # Get the currently executing Task
    current = asyncio.current_task()
    print(f"Running task: {current}")

    # Get all active tasks
    all_tasks = asyncio.all_tasks()
    print(f"All active tasks: {len(all_tasks)}")

    # Get the running event loop
    loop = asyncio.get_running_loop()
    print(f"Running loop: {loop}")

async def main():
    await inspect_context()

asyncio.run(main())

get_running_loop() raises RuntimeError if called outside a running coroutine — use asyncio.run() to bootstrap a proper context first.

Common Mistakes

Forgetting to await — the coroutine never runs:

async def greet():
    print("Hello!")

greet()  # Returns a coroutine object — nothing prints
await greet()  # Runs the coroutine

Using time.sleep() instead of asyncio.sleep() — blocks the event loop and freezes your entire async program.

Calling asyncio.run() twice — raises RuntimeError: Event loop is already running. Use await for nested coroutine calls instead.

Not keeping references to tasks — if garbage collection runs on a task you haven’t awaited yet, the task gets cancelled silently.

See Also