ORM Data Mapper
Map objects to database rows with lazy-loading proxies, identity map to avoid duplicates, unit of work for batched writes, and a fluent query builder. A mini SQLAlchemy/Hibernate.
Key Abstractions
Maps class fields to table columns, handles row-to-object and object-to-row conversion
Cache ensuring one object per primary key, so the same row never becomes two different objects
Tracks new, dirty, and deleted objects, flushes all changes as a single batch
Fluent API: query(User).where("age", ">", 21).order_by("name") produces SQL without string concatenation
Proxy that loads related objects on first access, deferring expensive JOINs until actually needed
Facade combining IdentityMap, UnitOfWork, and QueryBuilder into one entry point
Class Diagram
How It Works
Every application talks to a database, but objects and rows are very different things. A User object has methods, relationships, and behavior. A database row is flat key-value pairs in a table. The Data Mapper pattern bridges that gap: it sits between your domain objects and the database, translating back and forth so neither side knows about the other.
The Identity Map is the first layer of smarts. When you load User #7, the map stores that object. Ask for User #7 again and you get the exact same object back, not a fresh copy. Why does this matter? Because if two parts of your code load the same user and one changes the email, the other should see that change. Without an identity map, you get two separate objects, two separate mutations, and the last one to save wins. The other change vanishes.
The Unit of Work pattern tracks everything you have done during a session. It maintains three internal sets: new objects (registered via register_new()), dirty objects (modified since last flush), and deleted objects (marked for removal). Nothing touches the database while you work. Your code creates users, updates emails, deletes orders, all in memory.
When you call flush(), the Unit of Work writes all accumulated changes in one batch. Inserts first, then updates, then deletes. All within a single transaction boundary. If any write fails, the entire batch rolls back. You never end up with half your changes in the database and half lost.
Compare that to flushing after every single setter call. With 50 changes, that is 50 network round-trips versus one. And each individual write is its own transaction, so a failure on write #37 leaves writes 1-36 committed and 37-50 lost. Unit of Work eliminates both problems: fewer round-trips and atomic all-or-nothing semantics.
Lazy loading uses the Proxy pattern to defer expensive work. When you load a User, their orders are not fetched immediately. Instead, the orders field holds a LazyProxy. The first time you actually access the orders, the proxy fires a query and loads them. If you never touch the orders, no query runs. Simple and efficient, but watch out for the N+1 problem.
The Query Builder gives you a fluent API for constructing queries. Instead of concatenating SQL strings (and risking injection), you chain method calls: query(User).where("age", ">", 21).order_by("name"). Each call returns self, so the chain reads like a sentence. Under the hood, it produces clean parameterized SQL.
Requirements
Functional
- Map entity fields to table columns with explicit metadata
- Identity Map returns the same object for the same primary key within a session
- Unit of Work tracks inserts, updates, and deletes, flushing all in one batch
- Query Builder supports
where,order_by, andlimitwith fluent chaining - Lazy loading defers related-object queries until first access
- Session facade ties everything together with
add,get,delete,query,flush
Non-Functional
- No duplicate objects for the same row within a session
- Batch writes minimize database round-trips
- Query construction prevents SQL injection by avoiding string concatenation
- Lazy proxies are transparent to callers
Design Decisions
Why not just write raw SQL everywhere?
Raw SQL works fine for small projects. But as the codebase grows, you end up with SQL strings scattered across dozens of files. Change a column name and you are grep-searching the entire project. Worse, string concatenation for query parameters is the number one cause of SQL injection vulnerabilities. A Data Mapper centralizes the mapping logic. Column renames happen in one place. Query parameters are always escaped. And your domain objects stay clean of database concerns.
Why does the Identity Map matter so much?
Picture this: you load a User in your authentication layer. You load the same User in your billing layer. Without an identity map, those are two separate objects in memory. The auth layer sets user.verified = True. The billing layer still sees verified = False because it has a stale copy. When billing saves its copy, the verification is overwritten. The Identity Map prevents this by guaranteeing one object per primary key. Both layers get a reference to the same object. Mutations are immediately visible everywhere.
Why batch writes instead of immediate persistence?
Every database write involves a network round-trip: send the SQL, wait for acknowledgment. On a local database, that might be 0.5ms. Over a network, it could be 5ms. If you have 100 changes and flush each one individually, you spend 500ms just on network latency. Batching them into a single flush means one round-trip. It also means one transaction. If something fails halfway through, you roll back everything. With immediate persistence, the first 50 changes are already committed and you are stuck with a half-updated database.
When does lazy loading hurt?
Lazy loading is great for one-off access. Load a user, check their profile, done. But the moment you loop over users and access their orders, you hit the N+1 problem: 1 query for users, then N queries for each user's orders. If you have 1000 users, that is 1001 queries instead of 2. The fix is eager loading (a JOIN that fetches users and orders together) or batch fetching (load all orders for all visible user IDs in one query). SQLAlchemy calls these joinedload and subqueryload. Hibernate calls them FetchType.EAGER and @BatchSize.
Interview Follow-ups
- "How would you handle transactions?" Wrap the Unit of Work flush in a database transaction. Start the transaction before the first statement, commit after the last one succeeds, rollback on any failure. The Session becomes the transaction boundary. For nested transactions, use savepoints so inner failures do not kill the outer transaction.
- "How would you handle schema migrations?" Track the current schema version in a metadata table. Each migration is a numbered script (001_create_users.sql, 002_add_email_column.sql). On startup, the ORM checks which migrations have run and applies any new ones in order. Tools like Alembic (Python) and Flyway (Java) do exactly this. The key is making migrations idempotent and reversible.
- "How would you add eager loading to fix N+1?" Add an
eager_load("orders")method to QueryBuilder. When present, the builder generates a JOIN query that fetches both the parent and child rows. The mapper then splits the flat result set back into nested objects. Alternatively, use a subquery approach: after loading users, collect all user IDs and run one query for all orders matching those IDs, then stitch them together in memory. - "How does connection pooling interact with sessions?" Each session borrows a connection from the pool when you call
flush()and returns it immediately after. Sessions are short-lived (one per request in a web app). The pool handles the expensive part (keeping TCP connections alive), while the session handles the logical part (identity map, change tracking). Never hold a session open for the lifetime of the application. That pins a connection and starves other requests.
Code Implementation
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field as dc_field
from typing import Any, Callable as TypingCallable, Type
import copy
# --------------- Field Mapping ---------------
@dataclass
class FieldMapping:
field_name: str
column_name: str
field_type: type
# --------------- Entity Base ---------------
class Entity:
"""Base class for all mapped entities."""
_table_name: str = ""
_field_mappings: list[FieldMapping] = []
def __init__(self):
self.id: int | None = None
self._dirty: bool = False
def __repr__(self):
attrs = {m.field_name: getattr(self, m.field_name, None)
for m in self.__class__._field_mappings}
return f"{self.__class__.__name__}(id={self.id}, {attrs})"
def mark_dirty(self):
self._dirty = True
# --------------- Entity Mapper ---------------
class EntityMapper:
"""Converts between entity objects and database rows (dicts)."""
def __init__(self, entity_class: Type[Entity]):
self._entity_class = entity_class
self._table_name = entity_class._table_name
self._mappings = entity_class._field_mappings
@property
def table_name(self) -> str:
return self._table_name
def to_row(self, entity: Entity) -> dict[str, Any]:
row: dict[str, Any] = {"id": entity.id}
for m in self._mappings:
val = getattr(entity, m.field_name, None)
# Don't store proxy objects as raw values
if isinstance(val, LazyProxy):
val = val._target if val._loaded else None
row[m.column_name] = val
return row
def from_row(self, row: dict[str, Any]) -> Entity:
entity = self._entity_class.__new__(self._entity_class)
Entity.__init__(entity)
entity.id = row["id"]
for m in self._mappings:
setattr(entity, m.field_name, row.get(m.column_name))
entity._dirty = False
return entity
# --------------- Identity Map ---------------
class IdentityMap:
"""
One object per primary key. If the map already contains User#7,
you get that exact object back instead of a new copy.
"""
def __init__(self):
self._cache: dict[str, Entity] = {}
def _key(self, cls: Type[Entity], entity_id: int) -> str:
return f"{cls.__name__}:{entity_id}"
def get(self, cls: Type[Entity], entity_id: int) -> Entity | None:
return self._cache.get(self._key(cls, entity_id))
def put(self, entity: Entity) -> None:
key = self._key(entity.__class__, entity.id)
self._cache[key] = entity
def contains(self, cls: Type[Entity], entity_id: int) -> bool:
return self._key(cls, entity_id) in self._cache
def clear(self) -> None:
self._cache.clear()
# --------------- Unit of Work ---------------
class UnitOfWork:
"""
Tracks new, dirty, and deleted entities. Flush writes everything
to the database in one batch instead of one-at-a-time.
"""
def __init__(self, database: "InMemoryDatabase"):
self._new: list[Entity] = []
self._dirty: list[Entity] = []
self._deleted: list[Entity] = []
self._db = database
def register_new(self, entity: Entity) -> None:
if entity not in self._new:
self._new.append(entity)
def register_dirty(self, entity: Entity) -> None:
if entity not in self._dirty and entity not in self._new:
self._dirty.append(entity)
def register_deleted(self, entity: Entity) -> None:
self._deleted.append(entity)
# Remove from new/dirty if present
if entity in self._new:
self._new.remove(entity)
if entity in self._dirty:
self._dirty.remove(entity)
def flush(self) -> None:
mapper_cache: dict[str, EntityMapper] = {}
def get_mapper(entity: Entity) -> EntityMapper:
cls_name = entity.__class__.__name__
if cls_name not in mapper_cache:
mapper_cache[cls_name] = EntityMapper(entity.__class__)
return mapper_cache[cls_name]
# Inserts
for entity in self._new:
mapper = get_mapper(entity)
table = mapper.table_name
if table not in self._db.tables:
self._db.tables[table] = {}
# Auto-generate ID
if entity.id is None:
existing_ids = self._db.tables[table].keys()
entity.id = max(existing_ids, default=0) + 1
row = mapper.to_row(entity)
self._db.tables[table][entity.id] = row
entity._dirty = False
print(f" [UoW] INSERT into {table}: {row}")
# Updates
for entity in self._dirty:
mapper = get_mapper(entity)
table = mapper.table_name
row = mapper.to_row(entity)
self._db.tables[table][entity.id] = row
entity._dirty = False
print(f" [UoW] UPDATE {table} SET {row}")
# Deletes
for entity in self._deleted:
mapper = get_mapper(entity)
table = mapper.table_name
if entity.id in self._db.tables.get(table, {}):
del self._db.tables[table][entity.id]
print(f" [UoW] DELETE from {table} WHERE id={entity.id}")
self._new.clear()
self._dirty.clear()
self._deleted.clear()
print(f" [UoW] Flush complete")
# --------------- Lazy Proxy ---------------
class LazyProxy:
"""
Proxy that defers loading related objects until first access.
The caller does not know they are holding a proxy. On first touch,
the loader function runs and the real data appears.
"""
def __init__(self, loader: TypingCallable[[], Any]):
self._loader = loader
self._target: Any = None
self._loaded = False
def load(self) -> Any:
if not self._loaded:
print(f" [LazyProxy] Loading related data...")
self._target = self._loader()
self._loaded = True
return self._target
def __repr__(self):
if self._loaded:
return repr(self._target)
return "<LazyProxy: not loaded>"
def __iter__(self):
return iter(self.load())
def __len__(self):
return len(self.load())
# --------------- Query Builder ---------------
class QueryBuilder:
"""
Fluent API for building queries. Each method returns self so
you can chain: query(User).where("age", ">", 21).order_by("name")
"""
def __init__(self, entity_class: Type[Entity], db: "InMemoryDatabase",
identity_map: IdentityMap):
self._entity_class = entity_class
self._mapper = EntityMapper(entity_class)
self._db = db
self._identity_map = identity_map
self._conditions: list[tuple[str, str, Any]] = []
self._order_field: str | None = None
self._limit_val: int | None = None
def where(self, field: str, op: str, value: Any) -> "QueryBuilder":
self._conditions.append((field, op, value))
return self
def order_by(self, field: str) -> "QueryBuilder":
self._order_field = field
return self
def limit(self, n: int) -> "QueryBuilder":
self._limit_val = n
return self
def build_sql(self) -> str:
"""Show what the SQL would look like."""
table = self._mapper.table_name
sql = f"SELECT * FROM {table}"
if self._conditions:
clauses = [f"{f} {op} {repr(v)}" for f, op, v in self._conditions]
sql += " WHERE " + " AND ".join(clauses)
if self._order_field:
sql += f" ORDER BY {self._order_field}"
if self._limit_val:
sql += f" LIMIT {self._limit_val}"
return sql
def _matches(self, row: dict, field: str, op: str, value: Any) -> bool:
# Map field name to column name
col = field
for m in self._entity_class._field_mappings:
if m.field_name == field:
col = m.column_name
break
row_val = row.get(col)
if row_val is None:
return False
if op == "=": return row_val == value
if op == "!=": return row_val != value
if op == ">": return row_val > value
if op == "<": return row_val < value
if op == ">=": return row_val >= value
if op == "<=": return row_val <= value
return False
def execute(self) -> list[Entity]:
table = self._mapper.table_name
rows = self._db.tables.get(table, {})
print(f" [QueryBuilder] {self.build_sql()}")
results: list[Entity] = []
for row_id, row in rows.items():
match = all(
self._matches(row, f, op, v)
for f, op, v in self._conditions
)
if match:
# Check identity map first
cached = self._identity_map.get(self._entity_class, row_id)
if cached:
results.append(cached)
else:
entity = self._mapper.from_row(row)
self._identity_map.put(entity)
results.append(entity)
if self._order_field:
results.sort(key=lambda e: getattr(e, self._order_field, ""))
if self._limit_val:
results = results[:self._limit_val]
return results
# --------------- In-Memory Database ---------------
class InMemoryDatabase:
"""Simulates a relational database as nested dicts."""
def __init__(self):
self.tables: dict[str, dict[int, dict[str, Any]]] = {}
def dump(self) -> None:
for table_name, rows in self.tables.items():
print(f"\n Table '{table_name}':")
for row_id, row in rows.items():
print(f" {row}")
# --------------- Session (Facade) ---------------
class Session:
"""
Facade combining IdentityMap, UnitOfWork, and QueryBuilder
into one clean entry point. You open a session, do your work,
and flush.
"""
def __init__(self, db: InMemoryDatabase):
self._db = db
self._identity_map = IdentityMap()
self._uow = UnitOfWork(db)
def add(self, entity: Entity) -> None:
self._uow.register_new(entity)
if entity.id is not None:
self._identity_map.put(entity)
def get(self, cls: Type[Entity], entity_id: int) -> Entity | None:
# Check identity map first
cached = self._identity_map.get(cls, entity_id)
if cached:
print(f" [Session] Cache hit for {cls.__name__}#{entity_id}")
return cached
# Load from database
mapper = EntityMapper(cls)
table = mapper.table_name
row = self._db.tables.get(table, {}).get(entity_id)
if row is None:
return None
entity = mapper.from_row(row)
self._identity_map.put(entity)
print(f" [Session] Loaded {cls.__name__}#{entity_id} from DB")
return entity
def delete(self, entity: Entity) -> None:
self._uow.register_deleted(entity)
def mark_dirty(self, entity: Entity) -> None:
entity.mark_dirty()
self._uow.register_dirty(entity)
def query(self, cls: Type[Entity]) -> QueryBuilder:
return QueryBuilder(cls, self._db, self._identity_map)
def flush(self) -> None:
self._uow.flush()
def lazy_load(self, loader: TypingCallable[[], Any]) -> LazyProxy:
return LazyProxy(loader)
def clear(self) -> None:
self._identity_map.clear()
# --------------- Domain Entities ---------------
class User(Entity):
_table_name = "users"
_field_mappings = [
FieldMapping("name", "name", str),
FieldMapping("age", "age", int),
FieldMapping("email", "email", str),
]
def __init__(self, name: str = "", age: int = 0, email: str = ""):
super().__init__()
self.name = name
self.age = age
self.email = email
self.orders: Any = None # Will hold LazyProxy
class Order(Entity):
_table_name = "orders"
_field_mappings = [
FieldMapping("user_id", "user_id", int),
FieldMapping("product", "product", str),
FieldMapping("amount", "amount", float),
]
def __init__(self, user_id: int = 0, product: str = "", amount: float = 0.0):
super().__init__()
self.user_id = user_id
self.product = product
self.amount = amount
# --------------- Demo ---------------
if __name__ == "__main__":
db = InMemoryDatabase()
session = Session(db)
print("=== Add Users via Unit of Work ===")
alice = User("Alice", 30, "alice@example.com")
bob = User("Bob", 25, "bob@example.com")
carol = User("Carol", 35, "carol@example.com")
dave = User("Dave", 22, "dave@example.com")
session.add(alice)
session.add(bob)
session.add(carol)
session.add(dave)
print("\n=== Flush (Batch Insert) ===")
session.flush()
db.dump()
print("\n=== Add Orders ===")
o1 = Order(alice.id, "Laptop", 1200.00)
o2 = Order(alice.id, "Mouse", 25.00)
o3 = Order(bob.id, "Keyboard", 75.00)
session.add(o1)
session.add(o2)
session.add(o3)
session.flush()
print("\n=== Identity Map: Same Object Guarantee ===")
user_a = session.get(User, alice.id)
user_b = session.get(User, alice.id)
print(f" user_a is user_b? {user_a is user_b}")
print(f" Same id()? {id(user_a) == id(user_b)}")
print("\n=== Mutate Through One Reference, Visible in the Other ===")
user_a.email = "alice.new@example.com"
print(f" user_b.email: {user_b.email}") # Same object, so reflects change
print("\n=== Query Builder: Fluent API ===")
print("\n -- Users older than 24, ordered by name --")
results = (session.query(User)
.where("age", ">", 24)
.order_by("name")
.execute())
for u in results:
print(f" {u}")
print("\n -- Users exactly 22 years old --")
young = session.query(User).where("age", "=", 22).execute()
for u in young:
print(f" {u}")
print("\n=== Lazy Loading: Orders for Alice ===")
alice_fresh = session.get(User, alice.id)
alice_fresh.orders = session.lazy_load(
lambda: session.query(Order).where("user_id", "=", alice.id).execute()
)
print(f" Before access: {repr(alice_fresh.orders)}")
print(f" Accessing orders...")
for order in alice_fresh.orders:
print(f" {order}")
print(f" Second access (already loaded, no query):")
for order in alice_fresh.orders:
print(f" {order}")
print("\n=== Update via Unit of Work ===")
bob_entity = session.get(User, bob.id)
bob_entity.age = 26
session.mark_dirty(bob_entity)
session.flush()
print("\n=== Delete via Unit of Work ===")
session.delete(dave)
session.flush()
db.dump()
print("\n=== N+1 Problem Demo ===")
print(" BAD: loading orders one user at a time in a loop")
session.clear()
all_users = session.query(User).execute()
query_count = 0
for u in all_users:
orders = session.query(Order).where("user_id", "=", u.id).execute()
query_count += 1
print(f" Total queries: 1 (users) + {query_count} (orders) = {1 + query_count}")
print(" FIX: load all orders in one query, then group by user_id in memory")
print("\nAll operations completed successfully.")Common Mistakes
- ✗N+1 query problem: lazy loading in a loop loads N related objects one by one. If you iterate 100 users and access each user's orders, you get 1 query for users plus 100 queries for orders. Use eager loading or batch fetching when you know you will need the relation.
- ✗Skipping the identity map: two queries for the same primary key return different objects. You mutate one and the other still has stale data. Updates fight each other and the last write wins silently.
- ✗Flushing after every single change instead of batching: turns a 10-statement transaction into 10 separate round-trips. Latency adds up fast, especially over a network.
- ✗Exposing raw SQL strings built with concatenation: opens the door to SQL injection. Always use parameterized queries or a builder that escapes values.
Key Points
- ✓The Identity Map guarantees one object per primary key per session. Two queries for the same row return the same Python/Java object. Mutate it in one place, and you see the change everywhere. Without this, you get inconsistent copies and lost updates.
- ✓Unit of Work collects all inserts, updates, and deletes and flushes them in one batch. Nothing hits the database until you call flush(). If any write fails, you roll back the entire batch. One transaction boundary, one round-trip, and no partial writes corrupting your data.
- ✓Lazy loading defers expensive JOINs until you actually access the related field. If you load a User and never touch their orders, those orders are never fetched. The Proxy pattern makes this invisible to the caller.
- ✓The Query Builder produces SQL through method chaining instead of string concatenation. No f-strings, no format calls, no injection risk. Each method returns self, so you compose queries fluently.
- ✓Unit of Work acts like a local transaction log. You register_new(), register_dirty(), register_deleted() throughout your business logic. The session accumulates changes in memory. When you flush, it writes all changes inside a single database transaction. If anything fails, the session state stays dirty and you can retry or roll back cleanly.