queue
import queue The queue module provides thread-safe queue implementations for exchanging data between threads. It implements multi-producer, multi-consumer queues, making it essential for threaded applications where multiple threads need to share data safely. The module offers three queue types: FIFO (first-in-first-out), LIFO (last-in-first-out), and priority queues.
Queue Types
Queue
A FIFO queue where items are retrieved in the order they were added. This is the most common queue type for producer-consumer patterns.
import queue
q = queue.Queue(maxsize=10) # Optional maxsize limits capacity
q.put("first")
q.put("second")
print(q.get()) # first
print(q.get()) # second
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
maxsize | int | 0 | Maximum queue size. 0 means unlimited. |
LifoQueue
A last-in-first-out queue that operates like a stack. The most recently added item is retrieved first.
import queue
stack = queue.LifoQueue()
stack.put("first")
stack.put("second")
print(stack.get()) # second
print(stack.get()) # first
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
maxsize | int | 0 | Maximum queue size. 0 means unlimited. |
Note: LifoQueue uses put() and get() like other queues, not push() and pop(). The method names differ from a traditional stack.
PriorityQueue
A priority queue that retrieves items sorted by their priority value. The lowest value is retrieved first. Items are typically stored as tuples: (priority_number, data).
import queue
pq = queue.PriorityQueue()
pq.put((2, "medium priority"))
pq.put((1, "high priority"))
pq.put((3, "low priority"))
print(pq.get()) # (1, 'high priority')
print(pq.get()) # (2, 'medium priority')
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
maxsize | int | 0 | Maximum queue size. 0 means unlimited. |
SimpleQueue
An unbounded FIFO queue with a simpler implementation. It lacks task tracking features like task_done() and join(), but provides additional guarantees and is reentrant.
import queue
sq = queue.SimpleQueue()
sq.put("item1")
sq.put("item2")
print(sq.get()) # item1
Queue Methods
put(item, block=True, timeout=None)
Adds an item to the queue. By default, this blocks if the queue is full. Set block=False or use put_nowait() for non-blocking behavior.
q = queue.Queue(maxsize=2)
q.put("item1", timeout=5) # Blocks for up to 5 seconds
q.put_nowait("item2") # Raises Full if queue is full
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
item | any | — | The item to add to the queue |
block | bool | True | Block if queue is full |
timeout | float | None | Seconds to wait before raising Full |
get(block=True, timeout=None)
Removes and returns an item from the queue. By default, this blocks if the queue is empty.
q = queue.Queue()
item = q.get(timeout=5) # Blocks for up to 5 seconds
item_nowait = q.get_nowait() # Raises Empty if queue is empty
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
block | bool | True | Block if queue is empty |
timeout | float | None | Seconds to wait before raising Empty |
task_done()
Indicates that a previously enqueued task is complete. Used by consumer threads to signal processing completion. Must be called once for each get() call.
def worker(q):
while True:
item = q.get()
print(f"Processing: {item}")
q.task_done() # Signal completion
q = queue.Queue()
threading.Thread(target=worker, args=(q,), daemon=True).start()
join()
Blocks until all items in the queue have been processed. The queue tracks unfinished tasks—each put() increments the count, and each task_done() decrements it. When the count reaches zero, join() unblocks.
q = queue.Queue()
# ... worker threads call q.get() and q.task_done() ...
q.join() # Blocks until all items are processed
print("All tasks complete")
shutdown(immediate=False)
Puts the queue into shutdown mode (Python 3.13+). After shutdown, new put() calls raise ShutDown. If immediate=False, the queue drains normally. If immediate=True, the queue terminates immediately.
q.shutdown(immediate=False) # Allow pending items to be processed
# or
q.shutdown(immediate=True) # Hard stop
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
immediate | bool | False | If True, terminate immediately; if False, drain pending items first |
:::note
shutdown() is only available on Queue, LifoQueue, and PriorityQueue. It is not available on SimpleQueue.
:::
:::note
shutdown() and the ShutDown exception require Python 3.13+.
:::
Queue Properties
qsize()
Returns the approximate size of the queue. The value is approximate because another thread could modify it between the call and when you use the queue.
print(q.qsize()) # Approximate number of items
empty() / full()
Check if the queue is empty or at capacity. These are approximate—if empty() returns True, a subsequent get() might still block.
if not q.full():
q.put(item)
if not q.empty():
item = q.get()
Exceptions
| Exception | Description |
|---|---|
Empty | Raised when get_nowait() or get(block=False) is called on an empty queue |
Full | Raised when put_nowait() or put(block=False) is called on a full queue |
ShutDown | Raised when operations are attempted on a queue that has been shut down |
Thread Safety
All queue operations are atomic and thread-safe. Multiple threads can safely call put() and get() without additional locking. However, the queues are not designed to handle reentrancy within a single thread—a put() call cannot be interrupted by another put() in the same thread without potential issues (except for SimpleQueue, which is reentrant).
Note that qsize(), empty(), and full() are approximate—they don’t guarantee blocking behavior on subsequent operations.
Common Patterns
Producer-Consumer Pattern
import queue
import threading
import time
def producer(q, count):
for i in range(count):
time.sleep(0.1) # Simulate work
q.put(f"item-{i}")
q.shutdown() # Signal completion
def consumer(q):
while True:
try:
item = q.get(timeout=1)
print(f"Processing: {item}")
q.task_done()
except queue.ShutDown:
break
q = queue.Queue()
threading.Thread(target=producer, args=(q, 10)).start()
threading.Thread(target=consumer, args=(q,), daemon=True).start()
q.join()
Work Queue with Multiple Workers
import queue
import threading
def worker(worker_id, q):
while True:
item = q.get()
if item is None: # Poison pill
break
print(f"Worker {worker_id} processed {item}")
q.task_done()
q = queue.Queue()
workers = 4
# Start worker threads
threads = [threading.Thread(target=worker, args=(i, q)) for i in range(workers)]
for t in threads:
t.start()
# Add work items
for item in range(20):
q.put(item)
# Send poison pills to stop workers
for _ in range(workers):
q.put(None)
q.join()