Thread-Safe Queues with queue.Queue

· 4 min read · Updated March 15, 2026 · intermediate
queue threading concurrency threads multithreading

The queue module provides thread-safe queue implementations for Python. When multiple threads need to share data, using a queue prevents race conditions and makes coordination between threads straightforward. This guide shows you how to use Queue, LifoQueue, and PriorityQueue effectively.

Why Use Queues?

Threads communicating directly share state, which leads to bugs when multiple threads modify the same data simultaneously. Queues act as a buffer—one thread puts data in, another thread takes it out, and the queue handles all the synchronization internally.

Common use cases include:

  • Producer-consumer patterns: One thread produces data, another consumes it
  • Task distribution: A main thread hands work to worker threads
  • Result collection: Worker threads report results back to a coordinator

Basic Queue Operations

The Queue class implements a FIFO (first-in, first-out) queue:

import queue
import threading
import time

# Create a queue with a maximum size
work_queue = queue.Queue(maxsize=10)

def producer():
    for i in range(5):
        item = f"item-{i}"
        work_queue.put(item)  # Add item to queue
        print(f"Produced: {item}")
    work_queue.put(None)  # Sentinel to signal completion

def consumer():
    while True:
        item = work_queue.get()  # Remove item from queue
        if item is None:
            break
        print(f"Consumed: {item}")
        work_queue.task_done()

# Run producer and consumer
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

print("Done")

Output:

Produced: item-0
Produced: item-1
Consumed: item-0
Produced: item-2
Consumed: item-1
Produced: item-3
Consumed: item-2
Produced: item-4
Consumed: item-3
Consumed: item-4
Done

Queue Methods

Queues provide several key methods:

MethodDescription
put(item)Add an item, blocking if full
put_nowait(item)Add without blocking, raises Full if full
get()Remove and return an item, blocking if empty
get_nowait()Remove without blocking, raises Empty if empty
task_done()Signal that a task is complete
join()Block until all tasks are done

The join() and task_done() methods work together to let you wait for all items to be processed:

import queue
import threading

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Processing {item}")
        q.task_done()  # Signal this item is done

# Start worker threads
threads = []
for _ in range(3):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

# Add items
for item in ["a", "b", "c", "d", "e", None, None, None]:
    q.put(item)

# Wait for all items to be processed
q.join()

# Shut down workers
for t in threads:
    t.join()

print("All work complete")

LifoQueue: Stack Behavior

LifoQueue implements LIFO (last-in, first-out) behavior, like a stack:

import queue

stack = queue.LifoQueue()

stack.put("first")
stack.put("second")
stack.put("third")

# Items come off in reverse order
print(stack.get())  # third
print(stack.get())  # second
print(stack.get())  # first

Output:

third
second
first

LifoQueue is useful when you need to process the most recent items first, such as undo operations or depth-first traversal.

PriorityQueue: Ordered Processing

PriorityQueue removes items based on priority—lower values come out first:

import queue

priority_q = queue.PriorityQueue()

# Add items as tuples: (priority, data)
priority_q.put((3, "low priority"))
priority_q.put((1, "high priority"))
priority_q.put((2, "medium priority"))

# Items come off in priority order
while not priority_q.empty():
    print(priority_q.get()[1])

Output:

high priority
medium priority
low priority

A common pattern is to use negative timestamps for “most recent first”:

import queue
import time

recent_q = queue.PriorityQueue()

recent_q.put((-time.time(), "most recent message"))
time.sleep(0.1)
recent_q.put((-time.time(), "earlier message"))

print(recent_q.get()[1])  # most recent message

Handling Full and Empty Queues

The put() and get() methods block by default. Use put_nowait() and get_nowait() when you need non-blocking behavior:

import queue

q = queue.Queue(maxsize=2)

# This succeeds
q.put("item1")
q.put("item2")

# This would block forever:
# q.put("item3")

# Use put_nowait instead
try:
    q.put_nowait("item3")
except queue.Full:
    print("Queue is full, cannot add more")

# Same pattern for get
q2 = queue.Queue()  # Empty queue

try:
    item = q2.get_nowait()
except queue.Empty:
    print("Queue is empty, nothing to get")

Output:

Queue is full, cannot add more
Queue is empty, nothing to get

Thread Pool with Queue

A practical pattern combines a queue with a thread pool:

import queue
import threading

def worker(worker_id, task_queue, results):
    while True:
        task = task_queue.get()
        if task is None:
            break
        
        result = f"Worker {worker_id} processed {task}"
        results.append(result)
        
        task_queue.task_done()

# Create queues
tasks = queue.Queue()
results = []

# Start worker threads
num_workers = 4
threads = []
for i in range(num_workers):
    t = threading.Thread(target=worker, args=(i, tasks, results))
    t.start()
    threads.append(t)

# Add tasks
for task in range(1, 11):
    tasks.put(task)

# Signal workers to stop
for _ in range(num_workers):
    tasks.put(None)

# Wait for completion
tasks.join()

# Shut down threads
for t in threads:
    t.join()

# Print results
for r in results:
    print(r)

Output:

Worker 0 processed 1
Worker 3 processed 4
Worker 1 processed 2
Worker 2 processed 3
Worker 0 processed 5
Worker 3 processed 6
Worker 1 processed 7
Worker 2 processed 8
Worker 0 processed 9
Worker 3 processed 10

Notice how tasks are distributed across workers—each worker picks up the next available task automatically.

See Also