Thread Pool Executor
A configurable thread pool that accepts tasks, queues overflow, and rejects when full. Core pool, max pool, bounded queue, and rejection policies built from scratch.
Key Abstractions
Manages worker threads, task submission, and shutdown lifecycle
Persistent thread that pulls tasks from the queue and executes them
Command wrapping a submitted unit of work together with its Future
Caller-side handle to retrieve a Task result or exception asynchronously
Strategy for handling tasks when both pool and queue are full: abort, discard, caller-runs
Observer notified on task completion or failure callbacks
Class Diagram
How It Works
You have a bunch of tasks to run concurrently, but spinning up a new thread for each one is wasteful. Threads are not free. Each one takes memory for its stack (typically 512KB to 1MB), and the OS pays a real cost for context switching when you have too many. A thread pool solves this by keeping a fixed set of worker threads alive and feeding them tasks through a queue.
Here is the flow. You call submit(task) on the pool. If a core worker is idle, it picks up the task immediately from the queue. If all core workers are busy, the task goes into the bounded queue and waits. If the queue fills up, the pool creates extra workers up to max_size. And if even that is not enough, the rejection policy kicks in. AbortPolicy throws an error. DiscardPolicy drops the task silently. CallerRunsPolicy makes the thread that called submit() run the task itself, which creates natural back-pressure on the producer.
Workers are simple loops. Each one blocks on the queue waiting for a task. When a task arrives, the worker runs it, notifies any listeners about success or failure, and goes back to waiting. No busy-waiting. No polling. The Command pattern wraps each unit of work into a Task object that workers can execute without knowing what the work actually is.
The Observer pattern shows up in TaskListeners. You register callbacks that fire after every task completion or failure. Logging, metrics, alerting: all decoupled from the pool itself.
Requirements
Functional
submit(task)accepts work and returns aFuturefor the result- Core workers (always alive) and overflow workers (created on demand, torn down when idle)
- Bounded queue with configurable capacity between core workers and rejection
- Pluggable rejection policies: abort, discard, caller-runs
shutdown()stops accepting new tasks and drains in-flight work- TaskListener callbacks for completion and failure events
Non-Functional
- Thread-safe: multiple threads submit tasks concurrently
- Workers block on empty queue without wasting CPU
- Overflow workers are reclaimed when demand drops, keeping resource use proportional to load
- Futures provide a safe way to retrieve results or propagate exceptions back to the submitter
Design Decisions
Why a bounded queue instead of unbounded?
An unbounded queue never pushes back. During a traffic spike your producer can shove millions of tasks into the queue faster than workers can drain it. Memory climbs. GC pressure builds. Eventually the process dies with an OutOfMemoryError and you lose everything in that queue anyway. A bounded queue forces you to deal with overflow explicitly through rejection policies. It is a circuit breaker for your thread pool.
Why three rejection policies?
Different situations call for different trade-offs. AbortPolicy is the safe default: it fails loudly so you know something is wrong. DiscardPolicy is useful when dropping tasks is acceptable (think sampling metrics or heartbeat pings). CallerRunsPolicy is the clever one. By forcing the submitting thread to do the work, it naturally slows down the producer. If your web server's request-handling thread has to run the task itself, it cannot accept new requests until it finishes. That is back-pressure without any explicit rate limiting.
How do workers avoid busy-waiting?
Workers call a blocking get() on the queue with a timeout. If the queue is empty, the thread parks on a Condition variable and the OS does not schedule it. When a new task arrives, the Condition is signaled and one parked worker wakes up. This is fundamentally different from a spin loop that checks queue.empty() in a tight while-true. The spin loop burns CPU doing nothing. The Condition approach uses zero CPU while waiting.
Why separate core and max pool sizes?
Core workers are your baseline capacity. They stay alive even during quiet periods so you do not pay thread creation cost when the next request arrives. Max workers handle burst traffic. When a spike passes and the queue empties, overflow workers time out on the queue poll and exit. You get elastic scaling without manual intervention. Java's ThreadPoolExecutor uses this exact split, and tuning these two numbers is one of the most important configuration decisions in any JVM application.
Interview Follow-ups
- "How would you handle task priorities?" Replace the FIFO queue with a priority queue. Each task carries a priority value, and the queue orders by that value. One catch: low-priority tasks can starve if high-priority tasks keep arriving. A common fix is priority aging, where a task's priority increases the longer it waits in the queue.
- "How would you implement task cancellation?" Add a
cancel()method to Future that sets a cancelled flag. Workers check this flag before running the task and skip it if set. For tasks already running, you can use thread interruption, but the task code must cooperate by checkingThread.interrupted()at reasonable intervals. You cannot safely force-kill a running thread in Java or Python. - "How would you add a scheduled/delayed execution feature?" Use a delay queue instead of a plain FIFO queue. Each task carries an execution timestamp. Workers peek at the head of the queue and sleep until that timestamp arrives. Java's ScheduledThreadPoolExecutor does exactly this using a
DelayedWorkQueuebacked by a min-heap ordered by scheduled time. - "What metrics would you track in production?" Queue depth (how backed up you are), active thread count (how saturated you are), task completion rate and latency percentiles (p50, p95, p99), rejection count (how often you are overloaded), and task error rate. These feed into dashboards and alerts. A sudden spike in queue depth or rejections is an early warning that capacity needs to increase.
Code Implementation
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from threading import Thread, Lock, Condition, Event, current_thread
from queue import Queue, Full, Empty
from typing import Any, Callable as TypingCallable
import time
import traceback
# --------------- Task & Future ---------------
class Future:
"""Holds the result of an asynchronous task. Callers block on get()."""
def __init__(self):
self._result: Any = None
self._exception: Exception | None = None
self._done = Event()
def set_result(self, value: Any) -> None:
self._result = value
self._done.set()
def set_exception(self, exc: Exception) -> None:
self._exception = exc
self._done.set()
def get(self, timeout: float | None = None) -> Any:
if not self._done.wait(timeout=timeout):
raise TimeoutError("Future.get() timed out")
if self._exception:
raise self._exception
return self._result
def is_done(self) -> bool:
return self._done.is_set()
class Task:
"""Command wrapping a callable plus its Future."""
def __init__(self, fn: TypingCallable[[], Any], name: str = ""):
self.fn = fn
self.name = name or f"task-{id(fn)}"
self.future = Future()
def run(self) -> None:
try:
result = self.fn()
self.future.set_result(result)
except Exception as e:
self.future.set_exception(e)
raise
# --------------- Rejection Policies (Strategy) ---------------
class RejectionPolicy(ABC):
@abstractmethod
def reject(self, task: Task, pool: "ThreadPool") -> None: ...
class AbortPolicy(RejectionPolicy):
"""Throws an exception. The submitter knows immediately."""
def reject(self, task: Task, pool: "ThreadPool") -> None:
raise RuntimeError(
f"Task '{task.name}' rejected: pool and queue are full "
f"(workers={len(pool._workers)}, queue_size={pool._queue.qsize()})"
)
class DiscardPolicy(RejectionPolicy):
"""Silently drops the task. Use when losing work is acceptable."""
def reject(self, task: Task, pool: "ThreadPool") -> None:
print(f" [DiscardPolicy] Dropped task '{task.name}'")
task.future.set_exception(RuntimeError("Task discarded"))
class CallerRunsPolicy(RejectionPolicy):
"""The submitting thread runs the task itself. Natural back-pressure."""
def reject(self, task: Task, pool: "ThreadPool") -> None:
print(f" [CallerRunsPolicy] Running '{task.name}' on submitter thread")
task.run()
# --------------- Task Listener (Observer) ---------------
class TaskListener(ABC):
@abstractmethod
def on_complete(self, task: Task) -> None: ...
@abstractmethod
def on_error(self, task: Task, error: Exception) -> None: ...
class LoggingListener(TaskListener):
def on_complete(self, task: Task) -> None:
print(f" [Listener] Task '{task.name}' completed")
def on_error(self, task: Task, error: Exception) -> None:
print(f" [Listener] Task '{task.name}' failed: {error}")
# --------------- Worker ---------------
class Worker:
"""
A persistent thread that loops pulling tasks from the shared queue.
Blocks on the queue when it is empty. No busy-waiting.
"""
def __init__(self, pool: "ThreadPool", worker_id: int):
self._pool = pool
self._id = worker_id
self._running = True
self._thread = Thread(target=self._run, name=f"Worker-{worker_id}", daemon=True)
self._thread.start()
def _run(self) -> None:
while self._running:
try:
task = self._pool._queue.get(timeout=0.5)
except Empty:
# Check if we should exit (non-core worker scaling down)
with self._pool._lock:
if (not self._running
or (len(self._pool._workers) > self._pool._core_size
and self._pool._queue.empty())):
self._pool._workers.discard(self)
return
continue
thread_name = current_thread().name
try:
print(f" [{thread_name}] Running '{task.name}'")
task.run()
for listener in self._pool._listeners:
listener.on_complete(task)
except Exception as e:
for listener in self._pool._listeners:
listener.on_error(task, e)
def stop(self) -> None:
self._running = False
def join(self, timeout: float = 2.0) -> None:
self._thread.join(timeout=timeout)
# --------------- Thread Pool ---------------
class ThreadPool:
"""
Manages a pool of worker threads with core/max sizing, a bounded task
queue, configurable rejection policy, and observer callbacks.
"""
def __init__(
self,
core_size: int,
max_size: int,
queue_capacity: int,
rejection_policy: RejectionPolicy | None = None,
):
if core_size <= 0:
raise ValueError("core_size must be positive")
if max_size < core_size:
raise ValueError("max_size must be >= core_size")
if queue_capacity <= 0:
raise ValueError("queue_capacity must be positive")
self._core_size = core_size
self._max_size = max_size
self._queue: Queue[Task] = Queue(maxsize=queue_capacity)
self._rejection_policy = rejection_policy or AbortPolicy()
self._lock = Lock()
self._workers: set[Worker] = set()
self._listeners: list[TaskListener] = []
self._shutdown = False
# Pre-start core workers
for i in range(core_size):
self._workers.add(Worker(self, i))
def add_listener(self, listener: TaskListener) -> None:
self._listeners.append(listener)
def submit(self, fn: TypingCallable[[], Any], name: str = "") -> Future:
"""Submit a task. Returns a Future you can block on."""
if self._shutdown:
raise RuntimeError("Pool is shut down")
task = Task(fn, name)
# Try to enqueue
try:
self._queue.put_nowait(task)
print(f" Queued '{task.name}' (queue_size={self._queue.qsize()})")
return task.future
except Full:
pass
# Queue is full. Can we add a worker up to max_size?
with self._lock:
if len(self._workers) < self._max_size:
worker_id = len(self._workers)
w = Worker(self, worker_id)
self._workers.add(w)
print(f" Scaled up: created Worker-{worker_id} "
f"(workers={len(self._workers)}/{self._max_size})")
# Try again now that there is a new consumer
try:
self._queue.put_nowait(task)
print(f" Queued '{task.name}' after scale-up")
return task.future
except Full:
pass
# Both queue and pool are full. Apply rejection policy.
self._rejection_policy.reject(task, self)
return task.future
def shutdown(self, wait: bool = True) -> None:
"""Stop accepting tasks and drain workers."""
self._shutdown = True
with self._lock:
workers = list(self._workers)
for w in workers:
w.stop()
if wait:
for w in workers:
w.join()
print(f" Pool shut down ({len(workers)} workers stopped)")
@property
def stats(self) -> dict:
with self._lock:
return {
"workers": len(self._workers),
"core_size": self._core_size,
"max_size": self._max_size,
"queue_size": self._queue.qsize(),
}
# --------------- Demo ---------------
if __name__ == "__main__":
import random
print("=== Create Pool (core=2, max=4, queue=3, AbortPolicy) ===")
pool = ThreadPool(
core_size=2,
max_size=4,
queue_capacity=3,
rejection_policy=AbortPolicy(),
)
pool.add_listener(LoggingListener())
print(f" Stats: {pool.stats}")
print("\n=== Submit 10 Tasks ===")
futures: list[tuple[str, Future]] = []
for i in range(10):
name = f"job-{i}"
delay = round(random.uniform(0.3, 0.8), 2)
def work(n=name, d=delay):
time.sleep(d)
return f"{n} done after {d}s"
try:
f = pool.submit(work, name=name)
futures.append((name, f))
except RuntimeError as e:
print(f" REJECTED: {e}")
print(f"\n Stats after submission: {pool.stats}")
print("\n=== Collecting Results ===")
for name, future in futures:
try:
result = future.get(timeout=5.0)
print(f" {name} -> {result}")
except Exception as e:
print(f" {name} -> ERROR: {e}")
print("\n=== CallerRunsPolicy Demo ===")
pool2 = ThreadPool(
core_size=1,
max_size=1,
queue_capacity=1,
rejection_policy=CallerRunsPolicy(),
)
# Fill the single worker and single queue slot
pool2.submit(lambda: time.sleep(1.0), name="blocking-task")
pool2.submit(lambda: time.sleep(0.5), name="queued-task")
# This one should trigger CallerRunsPolicy
f_caller = pool2.submit(lambda: "ran on caller thread", name="overflow-task")
print(f" CallerRuns result: {f_caller.get(timeout=2.0)}")
pool2.shutdown()
print("\n=== Shutdown ===")
pool.shutdown()
print(f" Final stats: {pool.stats}")
print("\nAll operations completed successfully.")Common Mistakes
- ✗Using an unbounded queue: tasks pile up during bursts and the JVM runs out of memory before you ever notice the overload
- ✗Busy-waiting on an empty queue instead of blocking with a Condition: wastes CPU and defeats the purpose of pooling
- ✗Not handling task exceptions inside workers: an unhandled exception kills the worker thread silently and the pool slowly shrinks to zero
- ✗Creating threads without enforcing max pool size: under sustained load the system spawns thousands of threads and context-switching tanks throughput
Key Points
- ✓Core pool threads stay alive even when idle. Max pool threads are created only when the queue is full and torn down when demand drops. Getting that split right is essential for balancing resource use and throughput.
- ✓A bounded queue puts a hard cap on how many tasks can pile up in memory. An unbounded queue sounds convenient until it eats all your heap during a traffic spike.
- ✓Rejection policies are the Strategy pattern in action. When the pool and queue are both full, AbortPolicy throws, DiscardPolicy drops silently, CallerRunsPolicy forces the submitting thread to run the task itself and naturally slows down producers.
- ✓Worker threads block on the queue when it is empty using a Condition variable. No busy-waiting, no polling loops, no wasted CPU cycles.