Thread Pool Executor
A configurable thread pool that accepts tasks, queues overflow, and rejects when full. Core pool, max pool, bounded queue, and rejection policies built from scratch.
Key Abstractions
Manages worker threads, task submission, and shutdown lifecycle
Persistent thread that pulls tasks from the queue and executes them
Command wrapping a submitted unit of work together with its Future
Caller-side handle to retrieve a Task result or exception asynchronously
Strategy for handling tasks when both pool and queue are full: abort, discard, caller-runs
Observer notified on task completion or failure callbacks
Class Diagram
How It Works
You have a bunch of tasks to run concurrently, but spinning up a new thread for each one is wasteful. Threads are not free. Each one takes memory for its stack (typically 512KB to 1MB), and the OS pays a real cost for context switching when you have too many. A thread pool solves this by keeping a fixed set of worker threads alive and feeding them tasks through a queue.
Here is the flow. You call submit(task) on the pool. If a core worker is idle, it picks up the task immediately from the queue. If all core workers are busy, the task goes into the bounded queue and waits. If the queue fills up, the pool creates extra workers up to max_size. And if even that is not enough, the rejection policy kicks in. AbortPolicy throws an error. DiscardPolicy drops the task silently. CallerRunsPolicy makes the thread that called submit() run the task itself, which creates natural back-pressure on the producer.
Workers are simple loops. Each one blocks on the queue waiting for a task. When a task arrives, the worker runs it, notifies any listeners about success or failure, and goes back to waiting. No busy-waiting. No polling. The Command pattern wraps each unit of work into a Task object that workers can execute without knowing what the work actually is.
The Observer pattern shows up in TaskListeners. You register callbacks that fire after every task completion or failure. Logging, metrics, alerting: all decoupled from the pool itself.
Requirements
Functional
submit(task)accepts work and returns aFuturefor the result- Core workers (always alive) and overflow workers (created on demand, torn down when idle)
- Bounded queue with configurable capacity between core workers and rejection
- Pluggable rejection policies: abort, discard, caller-runs
shutdown()stops accepting new tasks and drains in-flight work- TaskListener callbacks for completion and failure events
Non-Functional
- Thread-safe: multiple threads submit tasks concurrently
- Workers block on empty queue without wasting CPU
- Overflow workers are reclaimed when demand drops, keeping resource use proportional to load
- Futures provide a safe way to retrieve results or propagate exceptions back to the submitter
Design Decisions
Why a bounded queue instead of unbounded?
An unbounded queue never pushes back. During a traffic spike your producer can shove millions of tasks into the queue faster than workers can drain it. Memory climbs. GC pressure builds. Eventually the process dies with an OutOfMemoryError and you lose everything in that queue anyway. A bounded queue forces you to deal with overflow explicitly through rejection policies. It is a circuit breaker for your thread pool.
Why three rejection policies?
Different situations call for different trade-offs. AbortPolicy is the safe default: it fails loudly so you know something is wrong. DiscardPolicy is useful when dropping tasks is acceptable (think sampling metrics or heartbeat pings). CallerRunsPolicy is the clever one. By forcing the submitting thread to do the work, it naturally slows down the producer. If your web server's request-handling thread has to run the task itself, it cannot accept new requests until it finishes. That is back-pressure without any explicit rate limiting.
How do workers avoid busy-waiting?
Workers call a blocking get() on the queue with a timeout. If the queue is empty, the thread parks on a Condition variable and the OS does not schedule it. When a new task arrives, the Condition is signaled and one parked worker wakes up. This is fundamentally different from a spin loop that checks queue.empty() in a tight while-true. The spin loop burns CPU doing nothing. The Condition approach uses zero CPU while waiting.
Why separate core and max pool sizes?
Core workers are your baseline capacity. They stay alive even during quiet periods so you do not pay thread creation cost when the next request arrives. Max workers handle burst traffic. When a spike passes and the queue empties, overflow workers time out on the queue poll and exit. You get elastic scaling without manual intervention. Java's ThreadPoolExecutor uses this exact split, and tuning these two numbers is one of the most important configuration decisions in any JVM application.
Interview Follow-ups
- "How would you handle task priorities?" Replace the FIFO queue with a priority queue. Each task carries a priority value, and the queue orders by that value. One catch: low-priority tasks can starve if high-priority tasks keep arriving. A common fix is priority aging, where a task's priority increases the longer it waits in the queue.
- "How would you implement task cancellation?" Add a
cancel()method to Future that sets a cancelled flag. Workers check this flag before running the task and skip it if set. For tasks already running, you can use thread interruption, but the task code must cooperate by checkingThread.interrupted()at reasonable intervals. You cannot safely force-kill a running thread in Java or Python. - "How would you add a scheduled/delayed execution feature?" Use a delay queue instead of a plain FIFO queue. Each task carries an execution timestamp. Workers peek at the head of the queue and sleep until that timestamp arrives. Java's ScheduledThreadPoolExecutor does exactly this using a
DelayedWorkQueuebacked by a min-heap ordered by scheduled time. - "What metrics would you track in production?" Queue depth (how backed up you are), active thread count (how saturated you are), task completion rate and latency percentiles (p50, p95, p99), rejection count (how often you are overloaded), and task error rate. These feed into dashboards and alerts. A sudden spike in queue depth or rejections is an early warning that capacity needs to increase.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from dataclasses import dataclass, field
4 from threading import Thread, Lock, Condition, Event, current_thread
5 from queue import Queue, Full, Empty
6 from typing import Any, Callable as TypingCallable
7 import time
8 import traceback
9
10
11 # --------------- Task & Future ---------------
12
13 class Future:
14 """Holds the result of an asynchronous task. Callers block on get()."""
15
16 def __init__(self):
17 self._result: Any = None
18 self._exception: Exception | None = None
19 self._done = Event()
20
21 def set_result(self, value: Any) -> None:
22 self._result = value
23 self._done.set()
24
25 def set_exception(self, exc: Exception) -> None:
26 self._exception = exc
27 self._done.set()
28
29 def get(self, timeout: float | None = None) -> Any:
30 if not self._done.wait(timeout=timeout):
31 raise TimeoutError("Future.get() timed out")
32 if self._exception:
33 raise self._exception
34 return self._result
35
36 def is_done(self) -> bool:
37 return self._done.is_set()
38
39
40 class Task:
41 """Command wrapping a callable plus its Future."""
42
43 def __init__(self, fn: TypingCallable[[], Any], name: str = ""):
44 self.fn = fn
45 self.name = name or f"task-{id(fn)}"
46 self.future = Future()
47
48 def run(self) -> None:
49 try:
50 result = self.fn()
51 self.future.set_result(result)
52 except Exception as e:
53 self.future.set_exception(e)
54 raise
55
56
57 # --------------- Rejection Policies (Strategy) ---------------
58
59 class RejectionPolicy(ABC):
60 @abstractmethod
61 def reject(self, task: Task, pool: "ThreadPool") -> None: ...
62
63
64 class AbortPolicy(RejectionPolicy):
65 """Throws an exception. The submitter knows immediately."""
66 def reject(self, task: Task, pool: "ThreadPool") -> None:
67 raise RuntimeError(
68 f"Task '{task.name}' rejected: pool and queue are full "
69 f"(workers={len(pool._workers)}, queue_size={pool._queue.qsize()})"
70 )
71
72
73 class DiscardPolicy(RejectionPolicy):
74 """Silently drops the task. Use when losing work is acceptable."""
75 def reject(self, task: Task, pool: "ThreadPool") -> None:
76 print(f" [DiscardPolicy] Dropped task '{task.name}'")
77 task.future.set_exception(RuntimeError("Task discarded"))
78
79
80 class CallerRunsPolicy(RejectionPolicy):
81 """The submitting thread runs the task itself. Natural back-pressure."""
82 def reject(self, task: Task, pool: "ThreadPool") -> None:
83 print(f" [CallerRunsPolicy] Running '{task.name}' on submitter thread")
84 task.run()
85
86
87 # --------------- Task Listener (Observer) ---------------
88
89 class TaskListener(ABC):
90 @abstractmethod
91 def on_complete(self, task: Task) -> None: ...
92
93 @abstractmethod
94 def on_error(self, task: Task, error: Exception) -> None: ...
95
96
97 class LoggingListener(TaskListener):
98 def on_complete(self, task: Task) -> None:
99 print(f" [Listener] Task '{task.name}' completed")
100
101 def on_error(self, task: Task, error: Exception) -> None:
102 print(f" [Listener] Task '{task.name}' failed: {error}")
103
104
105 # --------------- Worker ---------------
106
107 class Worker:
108 """
109 A persistent thread that loops pulling tasks from the shared queue.
110 Blocks on the queue when it is empty. No busy-waiting.
111 """
112
113 def __init__(self, pool: "ThreadPool", worker_id: int):
114 self._pool = pool
115 self._id = worker_id
116 self._running = True
117 self._thread = Thread(target=self._run, name=f"Worker-{worker_id}", daemon=True)
118 self._thread.start()
119
120 def _run(self) -> None:
121 while self._running:
122 try:
123 task = self._pool._queue.get(timeout=0.5)
124 except Empty:
125 # Check if we should exit (non-core worker scaling down)
126 with self._pool._lock:
127 if (not self._running
128 or (len(self._pool._workers) > self._pool._core_size
129 and self._pool._queue.empty())):
130 self._pool._workers.discard(self)
131 return
132 continue
133
134 thread_name = current_thread().name
135 try:
136 print(f" [{thread_name}] Running '{task.name}'")
137 task.run()
138 for listener in self._pool._listeners:
139 listener.on_complete(task)
140 except Exception as e:
141 for listener in self._pool._listeners:
142 listener.on_error(task, e)
143
144 def stop(self) -> None:
145 self._running = False
146
147 def join(self, timeout: float = 2.0) -> None:
148 self._thread.join(timeout=timeout)
149
150
151 # --------------- Thread Pool ---------------
152
153 class ThreadPool:
154 """
155 Manages a pool of worker threads with core/max sizing, a bounded task
156 queue, configurable rejection policy, and observer callbacks.
157 """
158
159 def __init__(
160 self,
161 core_size: int,
162 max_size: int,
163 queue_capacity: int,
164 rejection_policy: RejectionPolicy | None = None,
165 ):
166 if core_size <= 0:
167 raise ValueError("core_size must be positive")
168 if max_size < core_size:
169 raise ValueError("max_size must be >= core_size")
170 if queue_capacity <= 0:
171 raise ValueError("queue_capacity must be positive")
172
173 self._core_size = core_size
174 self._max_size = max_size
175 self._queue: Queue[Task] = Queue(maxsize=queue_capacity)
176 self._rejection_policy = rejection_policy or AbortPolicy()
177 self._lock = Lock()
178 self._workers: set[Worker] = set()
179 self._listeners: list[TaskListener] = []
180 self._shutdown = False
181
182 # Pre-start core workers
183 for i in range(core_size):
184 self._workers.add(Worker(self, i))
185
186 def add_listener(self, listener: TaskListener) -> None:
187 self._listeners.append(listener)
188
189 def submit(self, fn: TypingCallable[[], Any], name: str = "") -> Future:
190 """Submit a task. Returns a Future you can block on."""
191 if self._shutdown:
192 raise RuntimeError("Pool is shut down")
193
194 task = Task(fn, name)
195
196 # Try to enqueue
197 try:
198 self._queue.put_nowait(task)
199 print(f" Queued '{task.name}' (queue_size={self._queue.qsize()})")
200 return task.future
201 except Full:
202 pass
203
204 # Queue is full. Can we add a worker up to max_size?
205 with self._lock:
206 if len(self._workers) < self._max_size:
207 worker_id = len(self._workers)
208 w = Worker(self, worker_id)
209 self._workers.add(w)
210 print(f" Scaled up: created Worker-{worker_id} "
211 f"(workers={len(self._workers)}/{self._max_size})")
212 # Try again now that there is a new consumer
213 try:
214 self._queue.put_nowait(task)
215 print(f" Queued '{task.name}' after scale-up")
216 return task.future
217 except Full:
218 pass
219
220 # Both queue and pool are full. Apply rejection policy.
221 self._rejection_policy.reject(task, self)
222 return task.future
223
224 def shutdown(self, wait: bool = True) -> None:
225 """Stop accepting tasks and drain workers."""
226 self._shutdown = True
227 with self._lock:
228 workers = list(self._workers)
229 for w in workers:
230 w.stop()
231 if wait:
232 for w in workers:
233 w.join()
234 print(f" Pool shut down ({len(workers)} workers stopped)")
235
236 @property
237 def stats(self) -> dict:
238 with self._lock:
239 return {
240 "workers": len(self._workers),
241 "core_size": self._core_size,
242 "max_size": self._max_size,
243 "queue_size": self._queue.qsize(),
244 }
245
246
247 # --------------- Demo ---------------
248
249 if __name__ == "__main__":
250 import random
251
252 print("=== Create Pool (core=2, max=4, queue=3, AbortPolicy) ===")
253 pool = ThreadPool(
254 core_size=2,
255 max_size=4,
256 queue_capacity=3,
257 rejection_policy=AbortPolicy(),
258 )
259 pool.add_listener(LoggingListener())
260 print(f" Stats: {pool.stats}")
261
262 print("\n=== Submit 10 Tasks ===")
263 futures: list[tuple[str, Future]] = []
264 for i in range(10):
265 name = f"job-{i}"
266 delay = round(random.uniform(0.3, 0.8), 2)
267
268 def work(n=name, d=delay):
269 time.sleep(d)
270 return f"{n} done after {d}s"
271
272 try:
273 f = pool.submit(work, name=name)
274 futures.append((name, f))
275 except RuntimeError as e:
276 print(f" REJECTED: {e}")
277
278 print(f"\n Stats after submission: {pool.stats}")
279
280 print("\n=== Collecting Results ===")
281 for name, future in futures:
282 try:
283 result = future.get(timeout=5.0)
284 print(f" {name} -> {result}")
285 except Exception as e:
286 print(f" {name} -> ERROR: {e}")
287
288 print("\n=== CallerRunsPolicy Demo ===")
289 pool2 = ThreadPool(
290 core_size=1,
291 max_size=1,
292 queue_capacity=1,
293 rejection_policy=CallerRunsPolicy(),
294 )
295
296 # Fill the single worker and single queue slot
297 pool2.submit(lambda: time.sleep(1.0), name="blocking-task")
298 pool2.submit(lambda: time.sleep(0.5), name="queued-task")
299
300 # This one should trigger CallerRunsPolicy
301 f_caller = pool2.submit(lambda: "ran on caller thread", name="overflow-task")
302 print(f" CallerRuns result: {f_caller.get(timeout=2.0)}")
303
304 pool2.shutdown()
305
306 print("\n=== Shutdown ===")
307 pool.shutdown()
308 print(f" Final stats: {pool.stats}")
309
310 print("\nAll operations completed successfully.")Common Mistakes
- ✗Using an unbounded queue: tasks pile up during bursts and the JVM runs out of memory before you ever notice the overload
- ✗Busy-waiting on an empty queue instead of blocking with a Condition: wastes CPU and defeats the purpose of pooling
- ✗Not handling task exceptions inside workers: an unhandled exception kills the worker thread silently and the pool slowly shrinks to zero
- ✗Creating threads without enforcing max pool size: under sustained load the system spawns thousands of threads and context-switching tanks throughput
Key Points
- ✓Core pool threads stay alive even when idle. Max pool threads are created only when the queue is full and torn down when demand drops. Getting that split right is essential for balancing resource use and throughput.
- ✓A bounded queue puts a hard cap on how many tasks can pile up in memory. An unbounded queue sounds convenient until it eats all your heap during a traffic spike.
- ✓Rejection policies are the Strategy pattern in action. When the pool and queue are both full, AbortPolicy throws, DiscardPolicy drops silently, CallerRunsPolicy forces the submitting thread to run the task itself and naturally slows down producers.
- ✓Worker threads block on the queue when it is empty using a Condition variable. No busy-waiting, no polling loops, no wasted CPU cycles.