ORM Data Mapper
Map objects to database rows with lazy-loading proxies, identity map to avoid duplicates, unit of work for batched writes, and a fluent query builder. A mini SQLAlchemy/Hibernate.
Key Abstractions
Maps class fields to table columns, handles row-to-object and object-to-row conversion
Cache ensuring one object per primary key, so the same row never becomes two different objects
Tracks new, dirty, and deleted objects, flushes all changes as a single batch
Fluent API: query(User).where("age", ">", 21).order_by("name") produces SQL without string concatenation
Proxy that loads related objects on first access, deferring expensive JOINs until actually needed
Facade combining IdentityMap, UnitOfWork, and QueryBuilder into one entry point
Class Diagram
How It Works
Every application talks to a database, but objects and rows are very different things. A User object has methods, relationships, and behavior. A database row is flat key-value pairs in a table. The Data Mapper pattern bridges that gap: it sits between your domain objects and the database, translating back and forth so neither side knows about the other.
The Identity Map is the first layer of smarts. When you load User #7, the map stores that object. Ask for User #7 again and you get the exact same object back, not a fresh copy. Why does this matter? Because if two parts of your code load the same user and one changes the email, the other should see that change. Without an identity map, you get two separate objects, two separate mutations, and the last one to save wins. The other change vanishes.
The Unit of Work pattern tracks everything you have done during a session. It maintains three internal sets: new objects (registered via register_new()), dirty objects (modified since last flush), and deleted objects (marked for removal). Nothing touches the database while you work. Your code creates users, updates emails, deletes orders, all in memory.
When you call flush(), the Unit of Work writes all accumulated changes in one batch. Inserts first, then updates, then deletes. All within a single transaction boundary. If any write fails, the entire batch rolls back. You never end up with half your changes in the database and half lost.
Compare that to flushing after every single setter call. With 50 changes, that is 50 network round-trips versus one. And each individual write is its own transaction, so a failure on write #37 leaves writes 1-36 committed and 37-50 lost. Unit of Work eliminates both problems: fewer round-trips and atomic all-or-nothing semantics.
Lazy loading uses the Proxy pattern to defer expensive work. When you load a User, their orders are not fetched immediately. Instead, the orders field holds a LazyProxy. The first time you actually access the orders, the proxy fires a query and loads them. If you never touch the orders, no query runs. Simple and efficient, but watch out for the N+1 problem.
The Query Builder gives you a fluent API for constructing queries. Instead of concatenating SQL strings (and risking injection), you chain method calls: query(User).where("age", ">", 21).order_by("name"). Each call returns self, so the chain reads like a sentence. Under the hood, it produces clean parameterized SQL.
Requirements
Functional
- Map entity fields to table columns with explicit metadata
- Identity Map returns the same object for the same primary key within a session
- Unit of Work tracks inserts, updates, and deletes, flushing all in one batch
- Query Builder supports
where,order_by, andlimitwith fluent chaining - Lazy loading defers related-object queries until first access
- Session facade ties everything together with
add,get,delete,query,flush
Non-Functional
- No duplicate objects for the same row within a session
- Batch writes minimize database round-trips
- Query construction prevents SQL injection by avoiding string concatenation
- Lazy proxies are transparent to callers
Design Decisions
Why not just write raw SQL everywhere?
Raw SQL works fine for small projects. But as the codebase grows, you end up with SQL strings scattered across dozens of files. Change a column name and you are grep-searching the entire project. Worse, string concatenation for query parameters is the number one cause of SQL injection vulnerabilities. A Data Mapper centralizes the mapping logic. Column renames happen in one place. Query parameters are always escaped. And your domain objects stay clean of database concerns.
Why does the Identity Map matter so much?
Picture this: you load a User in your authentication layer. You load the same User in your billing layer. Without an identity map, those are two separate objects in memory. The auth layer sets user.verified = True. The billing layer still sees verified = False because it has a stale copy. When billing saves its copy, the verification is overwritten. The Identity Map prevents this by guaranteeing one object per primary key. Both layers get a reference to the same object. Mutations are immediately visible everywhere.
Why batch writes instead of immediate persistence?
Every database write involves a network round-trip: send the SQL, wait for acknowledgment. On a local database, that might be 0.5ms. Over a network, it could be 5ms. If you have 100 changes and flush each one individually, you spend 500ms just on network latency. Batching them into a single flush means one round-trip. It also means one transaction. If something fails halfway through, you roll back everything. With immediate persistence, the first 50 changes are already committed and you are stuck with a half-updated database.
When does lazy loading hurt?
Lazy loading is great for one-off access. Load a user, check their profile, done. But the moment you loop over users and access their orders, you hit the N+1 problem: 1 query for users, then N queries for each user's orders. If you have 1000 users, that is 1001 queries instead of 2. The fix is eager loading (a JOIN that fetches users and orders together) or batch fetching (load all orders for all visible user IDs in one query). SQLAlchemy calls these joinedload and subqueryload. Hibernate calls them FetchType.EAGER and @BatchSize.
Interview Follow-ups
- "How would you handle transactions?" Wrap the Unit of Work flush in a database transaction. Start the transaction before the first statement, commit after the last one succeeds, rollback on any failure. The Session becomes the transaction boundary. For nested transactions, use savepoints so inner failures do not kill the outer transaction.
- "How would you handle schema migrations?" Track the current schema version in a metadata table. Each migration is a numbered script (001_create_users.sql, 002_add_email_column.sql). On startup, the ORM checks which migrations have run and applies any new ones in order. Tools like Alembic (Python) and Flyway (Java) do exactly this. The key is making migrations idempotent and reversible.
- "How would you add eager loading to fix N+1?" Add an
eager_load("orders")method to QueryBuilder. When present, the builder generates a JOIN query that fetches both the parent and child rows. The mapper then splits the flat result set back into nested objects. Alternatively, use a subquery approach: after loading users, collect all user IDs and run one query for all orders matching those IDs, then stitch them together in memory. - "How does connection pooling interact with sessions?" Each session borrows a connection from the pool when you call
flush()and returns it immediately after. Sessions are short-lived (one per request in a web app). The pool handles the expensive part (keeping TCP connections alive), while the session handles the logical part (identity map, change tracking). Never hold a session open for the lifetime of the application. That pins a connection and starves other requests.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from dataclasses import dataclass, field as dc_field
4 from typing import Any, Callable as TypingCallable, Type
5 import copy
6
7
8 # --------------- Field Mapping ---------------
9
10 @dataclass
11 class FieldMapping:
12 field_name: str
13 column_name: str
14 field_type: type
15
16
17 # --------------- Entity Base ---------------
18
19 class Entity:
20 """Base class for all mapped entities."""
21 _table_name: str = ""
22 _field_mappings: list[FieldMapping] = []
23
24 def __init__(self):
25 self.id: int | None = None
26 self._dirty: bool = False
27
28 def __repr__(self):
29 attrs = {m.field_name: getattr(self, m.field_name, None)
30 for m in self.__class__._field_mappings}
31 return f"{self.__class__.__name__}(id={self.id}, {attrs})"
32
33 def mark_dirty(self):
34 self._dirty = True
35
36
37 # --------------- Entity Mapper ---------------
38
39 class EntityMapper:
40 """Converts between entity objects and database rows (dicts)."""
41
42 def __init__(self, entity_class: Type[Entity]):
43 self._entity_class = entity_class
44 self._table_name = entity_class._table_name
45 self._mappings = entity_class._field_mappings
46
47 @property
48 def table_name(self) -> str:
49 return self._table_name
50
51 def to_row(self, entity: Entity) -> dict[str, Any]:
52 row: dict[str, Any] = {"id": entity.id}
53 for m in self._mappings:
54 val = getattr(entity, m.field_name, None)
55 # Don't store proxy objects as raw values
56 if isinstance(val, LazyProxy):
57 val = val._target if val._loaded else None
58 row[m.column_name] = val
59 return row
60
61 def from_row(self, row: dict[str, Any]) -> Entity:
62 entity = self._entity_class.__new__(self._entity_class)
63 Entity.__init__(entity)
64 entity.id = row["id"]
65 for m in self._mappings:
66 setattr(entity, m.field_name, row.get(m.column_name))
67 entity._dirty = False
68 return entity
69
70
71 # --------------- Identity Map ---------------
72
73 class IdentityMap:
74 """
75 One object per primary key. If the map already contains User#7,
76 you get that exact object back instead of a new copy.
77 """
78
79 def __init__(self):
80 self._cache: dict[str, Entity] = {}
81
82 def _key(self, cls: Type[Entity], entity_id: int) -> str:
83 return f"{cls.__name__}:{entity_id}"
84
85 def get(self, cls: Type[Entity], entity_id: int) -> Entity | None:
86 return self._cache.get(self._key(cls, entity_id))
87
88 def put(self, entity: Entity) -> None:
89 key = self._key(entity.__class__, entity.id)
90 self._cache[key] = entity
91
92 def contains(self, cls: Type[Entity], entity_id: int) -> bool:
93 return self._key(cls, entity_id) in self._cache
94
95 def clear(self) -> None:
96 self._cache.clear()
97
98
99 # --------------- Unit of Work ---------------
100
101 class UnitOfWork:
102 """
103 Tracks new, dirty, and deleted entities. Flush writes everything
104 to the database in one batch instead of one-at-a-time.
105 """
106
107 def __init__(self, database: "InMemoryDatabase"):
108 self._new: list[Entity] = []
109 self._dirty: list[Entity] = []
110 self._deleted: list[Entity] = []
111 self._db = database
112
113 def register_new(self, entity: Entity) -> None:
114 if entity not in self._new:
115 self._new.append(entity)
116
117 def register_dirty(self, entity: Entity) -> None:
118 if entity not in self._dirty and entity not in self._new:
119 self._dirty.append(entity)
120
121 def register_deleted(self, entity: Entity) -> None:
122 self._deleted.append(entity)
123 # Remove from new/dirty if present
124 if entity in self._new:
125 self._new.remove(entity)
126 if entity in self._dirty:
127 self._dirty.remove(entity)
128
129 def flush(self) -> None:
130 mapper_cache: dict[str, EntityMapper] = {}
131
132 def get_mapper(entity: Entity) -> EntityMapper:
133 cls_name = entity.__class__.__name__
134 if cls_name not in mapper_cache:
135 mapper_cache[cls_name] = EntityMapper(entity.__class__)
136 return mapper_cache[cls_name]
137
138 # Inserts
139 for entity in self._new:
140 mapper = get_mapper(entity)
141 table = mapper.table_name
142 if table not in self._db.tables:
143 self._db.tables[table] = {}
144 # Auto-generate ID
145 if entity.id is None:
146 existing_ids = self._db.tables[table].keys()
147 entity.id = max(existing_ids, default=0) + 1
148 row = mapper.to_row(entity)
149 self._db.tables[table][entity.id] = row
150 entity._dirty = False
151 print(f" [UoW] INSERT into {table}: {row}")
152
153 # Updates
154 for entity in self._dirty:
155 mapper = get_mapper(entity)
156 table = mapper.table_name
157 row = mapper.to_row(entity)
158 self._db.tables[table][entity.id] = row
159 entity._dirty = False
160 print(f" [UoW] UPDATE {table} SET {row}")
161
162 # Deletes
163 for entity in self._deleted:
164 mapper = get_mapper(entity)
165 table = mapper.table_name
166 if entity.id in self._db.tables.get(table, {}):
167 del self._db.tables[table][entity.id]
168 print(f" [UoW] DELETE from {table} WHERE id={entity.id}")
169
170 self._new.clear()
171 self._dirty.clear()
172 self._deleted.clear()
173 print(f" [UoW] Flush complete")
174
175
176 # --------------- Lazy Proxy ---------------
177
178 class LazyProxy:
179 """
180 Proxy that defers loading related objects until first access.
181 The caller does not know they are holding a proxy. On first touch,
182 the loader function runs and the real data appears.
183 """
184
185 def __init__(self, loader: TypingCallable[[], Any]):
186 self._loader = loader
187 self._target: Any = None
188 self._loaded = False
189
190 def load(self) -> Any:
191 if not self._loaded:
192 print(f" [LazyProxy] Loading related data...")
193 self._target = self._loader()
194 self._loaded = True
195 return self._target
196
197 def __repr__(self):
198 if self._loaded:
199 return repr(self._target)
200 return "<LazyProxy: not loaded>"
201
202 def __iter__(self):
203 return iter(self.load())
204
205 def __len__(self):
206 return len(self.load())
207
208
209 # --------------- Query Builder ---------------
210
211 class QueryBuilder:
212 """
213 Fluent API for building queries. Each method returns self so
214 you can chain: query(User).where("age", ">", 21).order_by("name")
215 """
216
217 def __init__(self, entity_class: Type[Entity], db: "InMemoryDatabase",
218 identity_map: IdentityMap):
219 self._entity_class = entity_class
220 self._mapper = EntityMapper(entity_class)
221 self._db = db
222 self._identity_map = identity_map
223 self._conditions: list[tuple[str, str, Any]] = []
224 self._order_field: str | None = None
225 self._limit_val: int | None = None
226
227 def where(self, field: str, op: str, value: Any) -> "QueryBuilder":
228 self._conditions.append((field, op, value))
229 return self
230
231 def order_by(self, field: str) -> "QueryBuilder":
232 self._order_field = field
233 return self
234
235 def limit(self, n: int) -> "QueryBuilder":
236 self._limit_val = n
237 return self
238
239 def build_sql(self) -> str:
240 """Show what the SQL would look like."""
241 table = self._mapper.table_name
242 sql = f"SELECT * FROM {table}"
243 if self._conditions:
244 clauses = [f"{f} {op} {repr(v)}" for f, op, v in self._conditions]
245 sql += " WHERE " + " AND ".join(clauses)
246 if self._order_field:
247 sql += f" ORDER BY {self._order_field}"
248 if self._limit_val:
249 sql += f" LIMIT {self._limit_val}"
250 return sql
251
252 def _matches(self, row: dict, field: str, op: str, value: Any) -> bool:
253 # Map field name to column name
254 col = field
255 for m in self._entity_class._field_mappings:
256 if m.field_name == field:
257 col = m.column_name
258 break
259 row_val = row.get(col)
260 if row_val is None:
261 return False
262 if op == "=": return row_val == value
263 if op == "!=": return row_val != value
264 if op == ">": return row_val > value
265 if op == "<": return row_val < value
266 if op == ">=": return row_val >= value
267 if op == "<=": return row_val <= value
268 return False
269
270 def execute(self) -> list[Entity]:
271 table = self._mapper.table_name
272 rows = self._db.tables.get(table, {})
273 print(f" [QueryBuilder] {self.build_sql()}")
274
275 results: list[Entity] = []
276 for row_id, row in rows.items():
277 match = all(
278 self._matches(row, f, op, v)
279 for f, op, v in self._conditions
280 )
281 if match:
282 # Check identity map first
283 cached = self._identity_map.get(self._entity_class, row_id)
284 if cached:
285 results.append(cached)
286 else:
287 entity = self._mapper.from_row(row)
288 self._identity_map.put(entity)
289 results.append(entity)
290
291 if self._order_field:
292 results.sort(key=lambda e: getattr(e, self._order_field, ""))
293
294 if self._limit_val:
295 results = results[:self._limit_val]
296
297 return results
298
299
300 # --------------- In-Memory Database ---------------
301
302 class InMemoryDatabase:
303 """Simulates a relational database as nested dicts."""
304
305 def __init__(self):
306 self.tables: dict[str, dict[int, dict[str, Any]]] = {}
307
308 def dump(self) -> None:
309 for table_name, rows in self.tables.items():
310 print(f"\n Table '{table_name}':")
311 for row_id, row in rows.items():
312 print(f" {row}")
313
314
315 # --------------- Session (Facade) ---------------
316
317 class Session:
318 """
319 Facade combining IdentityMap, UnitOfWork, and QueryBuilder
320 into one clean entry point. You open a session, do your work,
321 and flush.
322 """
323
324 def __init__(self, db: InMemoryDatabase):
325 self._db = db
326 self._identity_map = IdentityMap()
327 self._uow = UnitOfWork(db)
328
329 def add(self, entity: Entity) -> None:
330 self._uow.register_new(entity)
331 if entity.id is not None:
332 self._identity_map.put(entity)
333
334 def get(self, cls: Type[Entity], entity_id: int) -> Entity | None:
335 # Check identity map first
336 cached = self._identity_map.get(cls, entity_id)
337 if cached:
338 print(f" [Session] Cache hit for {cls.__name__}#{entity_id}")
339 return cached
340
341 # Load from database
342 mapper = EntityMapper(cls)
343 table = mapper.table_name
344 row = self._db.tables.get(table, {}).get(entity_id)
345 if row is None:
346 return None
347
348 entity = mapper.from_row(row)
349 self._identity_map.put(entity)
350 print(f" [Session] Loaded {cls.__name__}#{entity_id} from DB")
351 return entity
352
353 def delete(self, entity: Entity) -> None:
354 self._uow.register_deleted(entity)
355
356 def mark_dirty(self, entity: Entity) -> None:
357 entity.mark_dirty()
358 self._uow.register_dirty(entity)
359
360 def query(self, cls: Type[Entity]) -> QueryBuilder:
361 return QueryBuilder(cls, self._db, self._identity_map)
362
363 def flush(self) -> None:
364 self._uow.flush()
365
366 def lazy_load(self, loader: TypingCallable[[], Any]) -> LazyProxy:
367 return LazyProxy(loader)
368
369 def clear(self) -> None:
370 self._identity_map.clear()
371
372
373 # --------------- Domain Entities ---------------
374
375 class User(Entity):
376 _table_name = "users"
377 _field_mappings = [
378 FieldMapping("name", "name", str),
379 FieldMapping("age", "age", int),
380 FieldMapping("email", "email", str),
381 ]
382
383 def __init__(self, name: str = "", age: int = 0, email: str = ""):
384 super().__init__()
385 self.name = name
386 self.age = age
387 self.email = email
388 self.orders: Any = None # Will hold LazyProxy
389
390
391 class Order(Entity):
392 _table_name = "orders"
393 _field_mappings = [
394 FieldMapping("user_id", "user_id", int),
395 FieldMapping("product", "product", str),
396 FieldMapping("amount", "amount", float),
397 ]
398
399 def __init__(self, user_id: int = 0, product: str = "", amount: float = 0.0):
400 super().__init__()
401 self.user_id = user_id
402 self.product = product
403 self.amount = amount
404
405
406 # --------------- Demo ---------------
407
408 if __name__ == "__main__":
409 db = InMemoryDatabase()
410 session = Session(db)
411
412 print("=== Add Users via Unit of Work ===")
413 alice = User("Alice", 30, "alice@example.com")
414 bob = User("Bob", 25, "bob@example.com")
415 carol = User("Carol", 35, "carol@example.com")
416 dave = User("Dave", 22, "dave@example.com")
417
418 session.add(alice)
419 session.add(bob)
420 session.add(carol)
421 session.add(dave)
422
423 print("\n=== Flush (Batch Insert) ===")
424 session.flush()
425 db.dump()
426
427 print("\n=== Add Orders ===")
428 o1 = Order(alice.id, "Laptop", 1200.00)
429 o2 = Order(alice.id, "Mouse", 25.00)
430 o3 = Order(bob.id, "Keyboard", 75.00)
431
432 session.add(o1)
433 session.add(o2)
434 session.add(o3)
435 session.flush()
436
437 print("\n=== Identity Map: Same Object Guarantee ===")
438 user_a = session.get(User, alice.id)
439 user_b = session.get(User, alice.id)
440 print(f" user_a is user_b? {user_a is user_b}")
441 print(f" Same id()? {id(user_a) == id(user_b)}")
442
443 print("\n=== Mutate Through One Reference, Visible in the Other ===")
444 user_a.email = "alice.new@example.com"
445 print(f" user_b.email: {user_b.email}") # Same object, so reflects change
446
447 print("\n=== Query Builder: Fluent API ===")
448 print("\n -- Users older than 24, ordered by name --")
449 results = (session.query(User)
450 .where("age", ">", 24)
451 .order_by("name")
452 .execute())
453 for u in results:
454 print(f" {u}")
455
456 print("\n -- Users exactly 22 years old --")
457 young = session.query(User).where("age", "=", 22).execute()
458 for u in young:
459 print(f" {u}")
460
461 print("\n=== Lazy Loading: Orders for Alice ===")
462 alice_fresh = session.get(User, alice.id)
463 alice_fresh.orders = session.lazy_load(
464 lambda: session.query(Order).where("user_id", "=", alice.id).execute()
465 )
466 print(f" Before access: {repr(alice_fresh.orders)}")
467 print(f" Accessing orders...")
468 for order in alice_fresh.orders:
469 print(f" {order}")
470
471 print(f" Second access (already loaded, no query):")
472 for order in alice_fresh.orders:
473 print(f" {order}")
474
475 print("\n=== Update via Unit of Work ===")
476 bob_entity = session.get(User, bob.id)
477 bob_entity.age = 26
478 session.mark_dirty(bob_entity)
479 session.flush()
480
481 print("\n=== Delete via Unit of Work ===")
482 session.delete(dave)
483 session.flush()
484 db.dump()
485
486 print("\n=== N+1 Problem Demo ===")
487 print(" BAD: loading orders one user at a time in a loop")
488 session.clear()
489 all_users = session.query(User).execute()
490 query_count = 0
491 for u in all_users:
492 orders = session.query(Order).where("user_id", "=", u.id).execute()
493 query_count += 1
494 print(f" Total queries: 1 (users) + {query_count} (orders) = {1 + query_count}")
495 print(" FIX: load all orders in one query, then group by user_id in memory")
496
497 print("\nAll operations completed successfully.")Common Mistakes
- ✗N+1 query problem: lazy loading in a loop loads N related objects one by one. If you iterate 100 users and access each user's orders, you get 1 query for users plus 100 queries for orders. Use eager loading or batch fetching when you know you will need the relation.
- ✗Skipping the identity map: two queries for the same primary key return different objects. You mutate one and the other still has stale data. Updates fight each other and the last write wins silently.
- ✗Flushing after every single change instead of batching: turns a 10-statement transaction into 10 separate round-trips. Latency adds up fast, especially over a network.
- ✗Exposing raw SQL strings built with concatenation: opens the door to SQL injection. Always use parameterized queries or a builder that escapes values.
Key Points
- ✓The Identity Map guarantees one object per primary key per session. Two queries for the same row return the same Python/Java object. Mutate it in one place, and you see the change everywhere. Without this, you get inconsistent copies and lost updates.
- ✓Unit of Work collects all inserts, updates, and deletes and flushes them in one batch. Nothing hits the database until you call flush(). If any write fails, you roll back the entire batch. One transaction boundary, one round-trip, and no partial writes corrupting your data.
- ✓Lazy loading defers expensive JOINs until you actually access the related field. If you load a User and never touch their orders, those orders are never fetched. The Proxy pattern makes this invisible to the caller.
- ✓The Query Builder produces SQL through method chaining instead of string concatenation. No f-strings, no format calls, no injection risk. Each method returns self, so you compose queries fluently.
- ✓Unit of Work acts like a local transaction log. You register_new(), register_dirty(), register_deleted() throughout your business logic. The session accumulates changes in memory. When you flush, it writes all changes inside a single database transaction. If anything fails, the session state stays dirty and you can retry or roll back cleanly.