Connection Pool
Object Pool for reusing expensive database connections. Proxy wrappers return connections on close() instead of destroying them. Singleton per data source prevents resource exhaustion.
Key Abstractions
Interface — execute query, close
Proxy — wraps real connection, overrides close() to return to pool
Factory — creates driver-specific connections: MySQL, Postgres
Enum: AVAILABLE, IN_USE, VALIDATING, EXPIRED
Object Pool + Singleton — manages borrow/return lifecycle
Max size, min idle, max wait time, validation query
Class Diagram
How It Works
Database connections are expensive to create. Each one requires a TCP handshake, TLS negotiation (if encrypted), authentication, and session setup on the database server. In a typical web application handling hundreds of requests per second, creating and tearing down a connection per request would waste enormous amounts of time and exhaust the database server's connection limit.
A connection pool solves this by creating a fixed set of connections upfront and lending them out to callers on demand. When a caller is done, the connection goes back to the pool rather than being destroyed. The next caller gets the same underlying connection without paying the creation cost again. This is the Object Pool pattern: one of the oldest and most practical patterns in systems programming.
The Proxy pattern makes this invisible to callers. A PooledConnection wraps the real connection and implements the same Connection interface. When application code calls close(), it thinks it's closing the connection. In reality, the proxy intercepts that call and returns the connection to the pool. The calling code never knows a pool exists.
Requirements
Functional
acquire()borrows a connection from the pool, blocking if none are availablerelease()/close()returns a connection to the pool for reuse- Connections are validated before being handed out to detect stale or broken connections
- Pool is bounded: never exceeds
maxSizeopen connections - Pre-fills
minIdleconnections on startup to avoid cold-start latency
Non-Functional
- Thread-safe: multiple threads borrow and return connections concurrently
- Bounded blocking:
acquire()waits up tomaxWaitMsthen throws a timeout, preventing indefinite thread stalls - Singleton per data source: one pool per database URL to prevent connection count explosion
Design Decisions
Couldn't callers just return connections manually?
Without the Proxy pattern, every caller must remember to call pool.release(conn) explicitly. That's error-prone. If an exception occurs between acquire() and release(), the connection leaks. With the Proxy pattern, callers use conn.close(): the same idiom they already know from non-pooled connections. The Proxy intercepts close() and returns the connection to the pool. It works naturally with try-with-resources (Java) or context managers (Python), making leaks much harder to introduce.
Does creating connections on demand work?
Creating a real connection involves DNS resolution, TCP three-way handshake, TLS negotiation, and database authentication. This takes 5-50ms depending on the database and network. Under load, creating a connection per request adds latency and can overwhelm the database with simultaneous connection attempts. The pool amortizes this cost: connections are created once and reused thousands of times. HikariCP (the most popular Java pool) benchmarks show pool acquisition in under 1 microsecond vs. 5+ milliseconds for fresh connections.
How should you validate connections before handing them out?
Before handing out a connection, the pool should verify it's still alive. Connections can die due to network issues, database restarts, or idle timeouts. Three common strategies: (1) Test on borrow: run a lightweight query like SELECT 1 before each borrow. Adds a round-trip per borrow but catches all stale connections. (2) Test on return: validate when the connection comes back. Wastes effort if the connection isn't borrowed again soon. (3) Background validation: a maintenance thread periodically pings idle connections. Best for production but more complex. Most pools (HikariCP, DBCP2) default to test-on-borrow with a configurable interval.
What happens if two parts of the app create separate pools?
If two parts of the application independently create pools for the same database, the total connection count doubles. Databases have hard limits (max_connections in PostgreSQL defaults to 100). Singleton-per-DSN ensures one pool per database, giving central control over connection counts. The Singleton is keyed on the data source name (host:port/database), so different databases get separate pools.
Interview Follow-ups
- "How would you detect connection leaks?" Track the borrow timestamp for each in-use connection. A background thread scans in-use connections and logs warnings for any held longer than a configurable threshold (e.g., 30 seconds). HikariCP calls this
leakDetectionThreshold. Some pools also capture the stack trace at borrow time so you can identify the leaking code path. - "How does dynamic pool sizing work?" Start with
minIdleconnections. When demand increases, grow the pool up tomaxSize. When demand drops, a maintenance thread can shrink the pool by closing idle connections that exceedminIdle. HikariCP tracks connection usage frequency and ages out underused connections. The key is hysteresis: don't shrink immediately after a spike or you'll just re-create connections on the next burst. - "How do health checks work in production pools?" HikariCP uses
Connection.isValid(timeout)(JDBC 4+) instead of running a validation query, which is faster because the driver can use a protocol-level ping. It also runs a backgroundhousekeepertask every 30 seconds to evict idle connections that exceedmaxLifetime(default 30 minutes) and refill tominimumIdle. - "What makes HikariCP faster than other pools?" HikariCP uses a custom lock-free
ConcurrentBagcollection instead of a blocking queue. Borrowing threads check their own thread-local list first (avoiding contention), then steal from a shared list. It also usesFastListinstead ofArrayListto avoid range checks, and the bytecode is carefully tuned to fit in CPU cache lines. These micro-optimizations add up to sub-microsecond acquire times under contention.
Code Implementation
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from threading import Condition, Lock, current_thread
import time
import uuid
# --------------- Config ---------------
@dataclass(frozen=True)
class PoolConfig:
"""Immutable pool configuration."""
max_size: int = 5
min_idle: int = 2
max_wait_sec: float = 3.0
validation_query: str = "SELECT 1"
def __post_init__(self):
if self.max_size <= 0:
raise ValueError("max_size must be positive")
if self.min_idle < 0 or self.min_idle > self.max_size:
raise ValueError("min_idle must be between 0 and max_size")
# --------------- State ---------------
class ConnectionState(Enum):
AVAILABLE = "AVAILABLE"
IN_USE = "IN_USE"
VALIDATING = "VALIDATING"
EXPIRED = "EXPIRED"
# --------------- Connection Interface ---------------
class Connection(ABC):
@abstractmethod
def execute(self, query: str) -> str: ...
@abstractmethod
def close(self) -> None: ...
# --------------- Real Connection ---------------
class RealConnection(Connection):
"""Simulates an actual database connection with TCP handshake cost."""
def __init__(self, host: str, port: int):
self.id = str(uuid.uuid4())[:8]
self.host = host
self.port = port
self.connected = False
self.created_at = time.monotonic()
def connect(self) -> None:
time.sleep(0.05) # Simulate TCP handshake + auth
self.connected = True
print(f" [RealConn-{self.id}] Connected to {self.host}:{self.port}")
def execute(self, query: str) -> str:
if not self.connected:
raise RuntimeError("Connection is not open")
return f"Result from {self.id} for: {query}"
def close(self) -> None:
if self.connected:
self.connected = False
print(f" [RealConn-{self.id}] Disconnected")
def is_valid(self) -> bool:
return self.connected
# --------------- Proxy ---------------
class PooledConnection(Connection):
"""
Proxy wrapper. Callers use the standard Connection interface.
close() returns the underlying connection to the pool instead of
destroying it.
"""
def __init__(self, real_conn: RealConnection, pool: "ConnectionPool"):
self._real = real_conn
self._pool = pool
self._closed = False
def execute(self, query: str) -> str:
if self._closed:
raise RuntimeError("Cannot use a closed pooled connection")
return self._real.execute(query)
def close(self) -> None:
if not self._closed:
self._closed = True
self._pool.release(self._real)
# --------------- Factory ---------------
class ConnectionFactory:
"""Creates driver-specific real connections."""
def __init__(self, host: str, port: int):
self._host = host
self._port = port
def create(self) -> RealConnection:
conn = RealConnection(self._host, self._port)
conn.connect()
return conn
# --------------- Connection Pool (Object Pool + Singleton) ---------------
class ConnectionPool:
"""
Object Pool that manages borrow/return of database connections.
Uses threading.Condition so acquire() can block until a connection
is returned by another thread or until the timeout expires.
"""
_instances: dict[str, "ConnectionPool"] = {}
_class_lock = Lock()
@classmethod
def get_instance(cls, dsn: str, config: PoolConfig | None = None) -> "ConnectionPool":
"""Singleton per data source name."""
with cls._class_lock:
if dsn not in cls._instances:
if config is None:
config = PoolConfig()
host, port_str = dsn.split(":")
port = int(port_str)
cls._instances[dsn] = ConnectionPool(config, ConnectionFactory(host, port))
return cls._instances[dsn]
@classmethod
def clear_instances(cls) -> None:
"""For testing : remove all singleton instances."""
with cls._class_lock:
for pool in cls._instances.values():
pool.shutdown()
cls._instances.clear()
def __init__(self, config: PoolConfig, factory: ConnectionFactory):
self._config = config
self._factory = factory
self._lock = Lock()
self._not_empty = Condition(self._lock)
self._available: list[RealConnection] = []
self._in_use: set[str] = set() # keyed by conn.id
self._states: dict[str, ConnectionState] = {}
self._all: dict[str, RealConnection] = {}
# Pre-fill min_idle connections
for _ in range(config.min_idle):
self._create_connection()
# ---- Internal helpers ----
def _create_connection(self) -> RealConnection:
conn = self._factory.create()
self._all[conn.id] = conn
self._states[conn.id] = ConnectionState.AVAILABLE
self._available.append(conn)
return conn
def _total_count(self) -> int:
return len(self._all)
def _validate(self, conn: RealConnection) -> bool:
self._states[conn.id] = ConnectionState.VALIDATING
valid = conn.is_valid()
if not valid:
self._states[conn.id] = ConnectionState.EXPIRED
return valid
# ---- Public API ----
def acquire(self) -> Connection:
"""Borrow a connection from the pool. Blocks up to max_wait_sec."""
with self._not_empty:
deadline = time.monotonic() + self._config.max_wait_sec
while True:
# Try to grab an available, valid connection
while self._available:
conn = self._available.pop(0)
if self._validate(conn):
self._states[conn.id] = ConnectionState.IN_USE
self._in_use.add(conn.id)
thread = current_thread().name
print(f" [{thread}] Acquired conn-{conn.id}")
return PooledConnection(conn, self)
# Connection was stale : discard it
print(f" Discarded stale conn-{conn.id}")
del self._all[conn.id]
del self._states[conn.id]
# No available connections : can we create a new one?
if self._total_count() < self._config.max_size:
conn = self._factory.create()
self._all[conn.id] = conn
self._states[conn.id] = ConnectionState.IN_USE
self._in_use.add(conn.id)
thread = current_thread().name
print(f" [{thread}] Acquired NEW conn-{conn.id}")
return PooledConnection(conn, self)
# Pool exhausted : wait
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(
f"Could not acquire connection within "
f"{self._config.max_wait_sec}s (pool size={self._config.max_size})"
)
print(f" [{current_thread().name}] Waiting for connection...")
self._not_empty.wait(timeout=remaining)
def release(self, conn: RealConnection) -> None:
"""Return a connection to the pool."""
with self._not_empty:
if conn.id in self._in_use:
self._in_use.discard(conn.id)
self._states[conn.id] = ConnectionState.AVAILABLE
self._available.append(conn)
print(f" Returned conn-{conn.id} to pool "
f"(available={len(self._available)}/{self._config.max_size})")
self._not_empty.notify()
def shutdown(self) -> None:
"""Close all connections and clear the pool."""
with self._lock:
for conn in self._all.values():
conn.close()
self._available.clear()
self._in_use.clear()
self._states.clear()
self._all.clear()
print(" Pool shut down : all connections closed.")
@property
def stats(self) -> dict:
with self._lock:
return {
"total": self._total_count(),
"available": len(self._available),
"in_use": len(self._in_use),
}
# --------------- Demo ---------------
if __name__ == "__main__":
import threading
ConnectionPool.clear_instances()
config = PoolConfig(max_size=3, min_idle=2, max_wait_sec=2.0)
pool = ConnectionPool.get_instance("localhost:5432", config)
print("=== Pool Stats After Init ===")
print(f" {pool.stats}")
print("\n=== Acquire & Execute ===")
c1 = pool.acquire()
c2 = pool.acquire()
print(f" c1 result: {c1.execute('SELECT * FROM users')}")
print(f" c2 result: {c2.execute('SELECT * FROM orders')}")
print(f" Stats: {pool.stats}")
print("\n=== Return c1 : Pool Reuses It ===")
c1.close()
print(f" Stats: {pool.stats}")
print("\n=== Acquire Again : Should Reuse Returned Connection ===")
c3 = pool.acquire()
print(f" c3 result: {c3.execute('SELECT count(*) FROM products')}")
print(f" Stats: {pool.stats}")
print("\n=== Exhaust Pool & Show Timeout ===")
# c2 and c3 are in use, pool max=3, acquire one more to fill it
c4 = pool.acquire()
print(f" Stats (full): {pool.stats}")
def try_acquire_blocked():
try:
conn = pool.acquire()
print(f" Blocked thread got conn: {conn.execute('SELECT 1')}")
conn.close()
except TimeoutError as e:
print(f" TimeoutError: {e}")
t = threading.Thread(target=try_acquire_blocked, name="Thread-Blocked")
t.start()
# Let the thread block, then return c2 so it can proceed
time.sleep(0.5)
print("\n=== Returning c2 to unblock waiting thread ===")
c2.close()
t.join()
print("\n=== Cleanup ===")
c3.close()
c4.close()
pool.shutdown()
print("\n=== Singleton Verification ===")
ConnectionPool.clear_instances()
p1 = ConnectionPool.get_instance("db.prod:5432", PoolConfig(max_size=3, min_idle=1))
p2 = ConnectionPool.get_instance("db.prod:5432")
print(f" Same instance? {p1 is p2}")
p1.shutdown()
ConnectionPool.clear_instances()
print("\nAll operations completed successfully.")Common Mistakes
- ✗Not returning connections (connection leak): the pool exhausts and all threads block
- ✗Skipping validation before borrowing: stale connections throw on first query
- ✗No max wait timeout: threads block forever when pool is exhausted
- ✗Creating multiple pools for the same data source: doubles connection count, hits DB max_connections
Key Points
- ✓Object Pool avoids repeated create/destroy of expensive resources: each connection costs a TCP handshake plus auth
- ✓Proxy is transparent: calling code uses the same Connection interface and doesn't know about the pool
- ✓State tracks each connection's lifecycle, preventing use of expired connections
- ✓Singleton per data source ensures one pool per database, preventing connection count explosion