ProcessPoolExecutor and Pool in Python
Python multiprocessing module lets you bypass the Global Interpreter Lock (GIL) by running tasks in separate processes. This guide covers the two main ways to create process pools: ProcessPoolExecutor from concurrent.futures and the classic multiprocessing.Pool.
Why Use Processes Instead of Threads?
Python GIL (Global Interpreter Lock) prevents multiple threads from executing Python bytecode simultaneously. This means threads cannot speed up CPU-bound work—tasks like mathematical computations, image processing, or data transformations.
Processes sidestep the GIL entirely. Each process has its own Python interpreter and memory space, so they truly run in parallel. The tradeoff is more overhead: processes take longer to start and cannot share memory directly like threads can.
For I/O-bound tasks, threads or async are usually better. For CPU-bound work, processes are the way to go.
ProcessPoolExecutor: The Modern API
The concurrent.futures module provides ProcessPoolExecutor, a high-level interface that feels similar to ThreadPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
import math
def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def count_primes_in_range(start, end):
return sum(1 for n in range(start, end) if is_prime(n))
if __name__ == "__main__":
ranges = [(0, 10000), (10000, 20000), (20000, 30000), (30000, 40000)]
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(count_primes_in_range, *zip(*ranges))
total = sum(results)
print(f"Total primes found: {total}")
Output:
Total primes found: 4221
The with statement handles shutdown automatically. The executor.map() method distributes work across processes and collects results.
Using submit() for Individual Tasks
For more control, use submit() to queue individual tasks:
from concurrent.futures import ProcessPoolExecutor
import time
def heavy_computation(n):
time.sleep(1)
return n * n
with ProcessPoolExecutor(max_workers=2) as executor:
future1 = executor.submit(heavy_computation, 10)
future2 = executor.submit(heavy_computation, 20)
future3 = executor.submit(heavy_computation, 30)
print(future1.result())
print(future2.result())
print(future3.result())
The Future object lets you check status, wait for results, or attach callbacks.
Working with as_completed()
Process results as they complete with as_completed():
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def process_item(item):
time.sleep(0.5)
return item * 2
items = [1, 2, 3, 4, 5]
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(process_item, item): item for item in items}
for future in as_completed(futures):
original = futures[future]
result = future.result()
print(f"{original} -> {result}")
Output:
2 -> 4
4 -> 8
1 -> 2
3 -> 6
5 -> 10
Results come back in completion order, not submission order.
The Classic multiprocessing.Pool
Before concurrent.futures, developers used multiprocessing.Pool. It offers similar functionality with a slightly different API:
import multiprocessing
import math
def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
def count_primes(args):
start, end = args
return sum(1 for n in range(start, end) if is_prime(n))
if __name__ == "__main__":
ranges = [(i, i + 10000) for i in range(0, 50000, 10000)]
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(count_primes, ranges)
print(f"Total primes: {sum(results)}")
Pool with map()
The Pool.map() function works like the built-in map(), but distributes work across processes:
import multiprocessing
def square(x):
return x * x
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
results = pool.map(square, numbers)
print(results)
Pool with imap() for Lazy Evaluation
Use imap() when you have large iterables and do not want to wait for all results at once:
import multiprocessing
import time
def process_slowly(x):
time.sleep(0.2)
return x * 2
if __name__ == "__main__":
with multiprocessing.Pool(processes=3) as pool:
results = pool.imap(process_slowly, range(10), chunksize=2)
for result in results:
print(f"Got: {result}")
The chunksize parameter controls how many items each process gets at once.
Pool with apply_async()
For fire-and-forget style tasks, use apply_async():
import multiprocessing
import time
def notify(email):
time.sleep(0.5)
print(f"Sent: {email}")
if __name__ == "__main__":
emails = ["a@test.com", "b@test.com", "c@test.com"]
with multiprocessing.Pool(processes=2) as pool:
async_results = [pool.apply_async(notify, (email,)) for email in emails]
for ar in async_results:
ar.get()
print("All notifications sent")
Passing Data to Workers
Processes do not share memory, so you must pass data explicitly:
import multiprocessing
from dataclasses import dataclass
@dataclass
class Config:
threshold: int
multiplier: float
def process_data(args):
data, config = args
return [x * config.multiplier for x in data if x > config.threshold]
if __name__ == "__main__":
config = Config(threshold=5, multiplier=2.0)
data_chunks = [[1, 2, 6, 8], [3, 5, 9, 10], [7, 11, 12, 4]]
with multiprocessing.Pool(processes=3) as pool:
results = pool.map(process_data, [(chunk, config) for chunk in data_chunks])
print(results)
Using Initializer to Set Up Workers
Run initialization code once per worker process:
import multiprocessing
worker_config = None
def init_worker(config_dict):
global worker_config
worker_config = config_dict
def process_item(item):
return item * worker_config["multiplier"]
if __name__ == "__main__":
config = {"multiplier": 10}
with multiprocessing.Pool(
processes=3,
initializer=init_worker,
initargs=(config,)
) as pool:
results = pool.map(process_item, [1, 2, 3])
print(results)
This is useful for loading expensive resources once per worker.
Sharing State Between Processes
For shared state, use multiprocessing.Value:
import multiprocessing
def increment(counter, lock):
for _ in range(100000):
with lock:
counter.value += 1
if __name__ == "__main__":
counter = multiprocessing.Value("i", 0)
lock = multiprocessing.Lock()
processes = [
multiprocessing.Process(target=increment, args=(counter, lock))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Final count: {counter.value}")
Best Practices
- Use if name == “main”: Required on Windows and good practice everywhere
- Pickle only what you need: Large data passed to workers copies multiple times
- Choose the right chunksize: Too small wastes IPC overhead; too large imbalances work
- Reuse the pool: Creating pools is expensive; create once, use many times
- Consider multiprocessing.cpu_count(): Defaults to number of CPU cores
- Handle exceptions carefully: Uncaught exceptions can hang workers
Common Pitfalls
- Forgetting if name == “main”: Causes infinite spawn loops on Windows
- Passing unpicklable objects: Lambdas, generators, and some objects cannot cross process boundaries
- Too many processes: More processes than CPU cores causes context-switching overhead
- Race conditions with shared memory: Even with Value, you need locks for compound operations
- Not considering overhead: For very fast tasks, multiprocessing is slower than sequential execution
When to Use Which
| Use Case | Recommended |
|---|---|
| Simple parallel map | ProcessPoolExecutor.map() |
| Fire-and-forget tasks | Pool.apply_async() |
| Results as they complete | as_completed() |
| Custom initialization | Pool(initializer=…) |
| Shared mutable state | multiprocessing.Value |
See Also
- threading module — Thread-based concurrency for I/O-bound tasks
- concurrent.futures module — High-level API for both threads and processes
- multiprocessing module — Full multiprocessing documentation
- asyncio guide — Async/await for concurrent I/O