Thread-Safe Queues with queue.Queue
The queue module provides thread-safe queue implementations for Python. When multiple threads need to share data, using a queue prevents race conditions and makes coordination between threads straightforward. This guide shows you how to use Queue, LifoQueue, and PriorityQueue effectively.
Why Use Queues?
Threads communicating directly share state, which leads to bugs when multiple threads modify the same data simultaneously. Queues act as a buffer—one thread puts data in, another thread takes it out, and the queue handles all the synchronization internally.
Common use cases include:
- Producer-consumer patterns: One thread produces data, another consumes it
- Task distribution: A main thread hands work to worker threads
- Result collection: Worker threads report results back to a coordinator
Basic Queue Operations
The Queue class implements a FIFO (first-in, first-out) queue:
import queue
import threading
import time
# Create a queue with a maximum size
work_queue = queue.Queue(maxsize=10)
def producer():
for i in range(5):
item = f"item-{i}"
work_queue.put(item) # Add item to queue
print(f"Produced: {item}")
work_queue.put(None) # Sentinel to signal completion
def consumer():
while True:
item = work_queue.get() # Remove item from queue
if item is None:
break
print(f"Consumed: {item}")
work_queue.task_done()
# Run producer and consumer
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
print("Done")
Output:
Produced: item-0
Produced: item-1
Consumed: item-0
Produced: item-2
Consumed: item-1
Produced: item-3
Consumed: item-2
Produced: item-4
Consumed: item-3
Consumed: item-4
Done
Queue Methods
Queues provide several key methods:
| Method | Description |
|---|---|
put(item) | Add an item, blocking if full |
put_nowait(item) | Add without blocking, raises Full if full |
get() | Remove and return an item, blocking if empty |
get_nowait() | Remove without blocking, raises Empty if empty |
task_done() | Signal that a task is complete |
join() | Block until all tasks are done |
The join() and task_done() methods work together to let you wait for all items to be processed:
import queue
import threading
q = queue.Queue()
def worker():
while True:
item = q.get()
if item is None:
break
print(f"Processing {item}")
q.task_done() # Signal this item is done
# Start worker threads
threads = []
for _ in range(3):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
# Add items
for item in ["a", "b", "c", "d", "e", None, None, None]:
q.put(item)
# Wait for all items to be processed
q.join()
# Shut down workers
for t in threads:
t.join()
print("All work complete")
LifoQueue: Stack Behavior
LifoQueue implements LIFO (last-in, first-out) behavior, like a stack:
import queue
stack = queue.LifoQueue()
stack.put("first")
stack.put("second")
stack.put("third")
# Items come off in reverse order
print(stack.get()) # third
print(stack.get()) # second
print(stack.get()) # first
Output:
third
second
first
LifoQueue is useful when you need to process the most recent items first, such as undo operations or depth-first traversal.
PriorityQueue: Ordered Processing
PriorityQueue removes items based on priority—lower values come out first:
import queue
priority_q = queue.PriorityQueue()
# Add items as tuples: (priority, data)
priority_q.put((3, "low priority"))
priority_q.put((1, "high priority"))
priority_q.put((2, "medium priority"))
# Items come off in priority order
while not priority_q.empty():
print(priority_q.get()[1])
Output:
high priority
medium priority
low priority
A common pattern is to use negative timestamps for “most recent first”:
import queue
import time
recent_q = queue.PriorityQueue()
recent_q.put((-time.time(), "most recent message"))
time.sleep(0.1)
recent_q.put((-time.time(), "earlier message"))
print(recent_q.get()[1]) # most recent message
Handling Full and Empty Queues
The put() and get() methods block by default. Use put_nowait() and get_nowait() when you need non-blocking behavior:
import queue
q = queue.Queue(maxsize=2)
# This succeeds
q.put("item1")
q.put("item2")
# This would block forever:
# q.put("item3")
# Use put_nowait instead
try:
q.put_nowait("item3")
except queue.Full:
print("Queue is full, cannot add more")
# Same pattern for get
q2 = queue.Queue() # Empty queue
try:
item = q2.get_nowait()
except queue.Empty:
print("Queue is empty, nothing to get")
Output:
Queue is full, cannot add more
Queue is empty, nothing to get
Thread Pool with Queue
A practical pattern combines a queue with a thread pool:
import queue
import threading
def worker(worker_id, task_queue, results):
while True:
task = task_queue.get()
if task is None:
break
result = f"Worker {worker_id} processed {task}"
results.append(result)
task_queue.task_done()
# Create queues
tasks = queue.Queue()
results = []
# Start worker threads
num_workers = 4
threads = []
for i in range(num_workers):
t = threading.Thread(target=worker, args=(i, tasks, results))
t.start()
threads.append(t)
# Add tasks
for task in range(1, 11):
tasks.put(task)
# Signal workers to stop
for _ in range(num_workers):
tasks.put(None)
# Wait for completion
tasks.join()
# Shut down threads
for t in threads:
t.join()
# Print results
for r in results:
print(r)
Output:
Worker 0 processed 1
Worker 3 processed 4
Worker 1 processed 2
Worker 2 processed 3
Worker 0 processed 5
Worker 3 processed 6
Worker 1 processed 7
Worker 2 processed 8
Worker 0 processed 9
Worker 3 processed 10
Notice how tasks are distributed across workers—each worker picks up the next available task automatically.
See Also
threading— Thread creation and synchronization primitivesconcurrent.futures— High-level thread pool APImultiprocessing— Process-based parallelism for CPU-bound workasyncio— Async IO for I/O-bound concurrency