Event Sourcing System
Immutable event log with Command-based changes, Mediator coordinating sources and sinks, Composite transformation pipelines, and swappable delivery strategies.
Key Abstractions
Command — immutable data change with type, payload, timestamp, sequence number
Append-only log of events
Mediator — coordinates between producers, transformers, and consumers
Composite — Filter inside Map inside Batch; complex transforms are trees of simpler ones
Strategy — at-least-once, at-most-once, exactly-once semantics
Observer — consumes events, tracks offset
Class Diagram
How It Works
Event sourcing treats every state change as an immutable event appended to a log. Instead of overwriting a row in a database, you record "user created", "order placed", "payment processed" as discrete facts. The current state of any entity is derived by replaying its events from the beginning. The append-only model gives you a complete audit trail, the ability to rebuild state at any point in time, and natural decoupling between writers and readers.
The EventStore is an ordered, append-only sequence of events. Each event gets a monotonically increasing sequence number. Producers write to the store through an EventMediator that acts as the single coordination point. Producers never know about consumers, and consumers never know about producers. The mediator handles routing, buffering, and back-pressure management.
Change Data Capture (CDC) layers cleanly on top of event sourcing. Instead of polling a database for changes, consumers subscribe to the event stream and receive changes as they happen. A CompositePipeline sits between the raw event stream and subscribers, filtering out irrelevant events, enriching payloads, and batching for efficiency. Because the pipeline is a Composite, you build complex transformations by composing simple ones: a FilterTransform feeding a MapTransform feeding a BatchTransform.
Different consumers need different delivery guarantees. An audit log needs at-least-once delivery with retries so no event is ever lost. An analytics engine needs exactly-once delivery to avoid double-counting. The DeliveryStrategy abstraction makes this configurable per subscriber: the same event stream can serve both consumers with different semantics.
Requirements
Functional
- Append immutable events to an ordered, durable event store with sequence numbers
- Replay events from any sequence number to rebuild state
- Route events from producers to consumers through a mediator without direct coupling
- Support composable transformation pipelines: filter, map, batch
- Deliver events with configurable guarantees: at-least-once, exactly-once
Non-Functional
- Thread-safe event store supporting concurrent appends
- Sequence numbers must be gap-free for consumers to detect missing events
- Transformation pipelines must be composable and extensible without modifying existing transforms
- Delivery strategies must be swappable per subscriber without affecting the routing layer
- Back-pressure must prevent fast producers from overwhelming slow consumers
Design Decisions
What do you lose by just updating rows in place?
When you overwrite a row, the previous value is gone. Immutable events preserve the full timeline: what changed, when, and in what order. You can rebuild state at any point by replaying events up to that moment. That's invaluable for debugging, auditing, and retroactive analytics. The trade-off is storage and replay cost, which snapshots mitigate.
Couldn't producers just publish directly to topics?
Direct pub/sub couples producers to topic structures and consumers to subscription management. A Mediator centralizes routing logic, so you can add buffering, back-pressure, transform pipelines, and dead-letter handling in one place. Producers call publish() and walk away. The mediator decides how, when, and to whom events are delivered. For CDC, the same event often needs different transformations for different downstream systems, and the mediator is where that logic lives.
Why not just hardcode the filter-map-batch sequence?
Each transformation (filter, map, batch) is simple on its own. Real pipelines combine them: filter out irrelevant event types, enrich the payload with metadata, batch for bulk insertion. Composite lets you build these by stacking transforms. Need a new enrichment step? Write a MapTransform and add it. The pipeline doesn't care about the internal structure of its children.
How do you pick between at-least-once and exactly-once?
At-least-once is the simplest: retry on failure, but the consumer must be idempotent because it may see duplicates. Exactly-once requires tracking which events have been delivered to each subscriber, adding storage and coordination overhead. At-most-once delivers once with no retry: events can be lost but there's no duplication. Strategy lets each subscriber choose independently based on its tolerance for duplicates versus data loss.
Interview Follow-ups
- "How would you add snapshotting for fast replay?" Periodically serialize the current state derived from events into a snapshot. On replay, load the latest snapshot and replay only events after the snapshot's sequence number. This bounds replay time regardless of total event count. Schedule snapshots based on event count thresholds or time intervals.
- "How does CQRS fit with event sourcing?" CQRS separates the write model (command side appending events) from the read model (query side built by projecting events into denormalized views). The event store is the write side. Subscribers maintain read-optimized projections: one might build a search index, another a materialized view for dashboards. Each projection consumes the same event stream but structures data differently.
- "How do you handle schema evolution?" Version your events. An
ORDER_PLACED_V2event might add acurrencyfield. Write upcasters that transform V1 events to V2 format during replay. Never modify existing event schemas in place. That breaks immutability. Consumer-side upcasting keeps the event store stable while allowing the domain model to evolve. - "How do you order events in a distributed system?" Use a global sequence number from a centralized sequencer (e.g., a database auto-increment or a Snowflake ID generator). For partitioned event stores, maintain per-partition ordering and use vector clocks or hybrid logical clocks for cross-partition causal ordering. Accept that total ordering across partitions is expensive and often unnecessary. Causal ordering within an aggregate is usually sufficient.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from dataclasses import dataclass, field
4 from datetime import datetime
5 from typing import Any, Callable
6 import threading
7 import time
8
9
10 # --------------- Command Pattern: immutable events ---------------
11
12 @dataclass(frozen=True)
13 class Event:
14 """Immutable event : the fundamental unit of state change."""
15 event_type: str
16 payload: dict[str, Any]
17 timestamp: float = field(default_factory=time.time)
18 sequence: int = 0
19
20 def __str__(self) -> str:
21 return f"Event(seq={self.sequence}, type={self.event_type}, payload={self.payload})"
22
23
24 # --------------- Event Store: append-only log ---------------
25
26 class EventStore:
27 """Append-only log. Events are never mutated or deleted."""
28
29 def __init__(self) -> None:
30 self._events: list[Event] = []
31 self._current_sequence = 0
32 self._lock = threading.Lock()
33
34 def append(self, event_type: str, payload: dict[str, Any]) -> Event:
35 with self._lock:
36 self._current_sequence += 1
37 # Create new event with assigned sequence number
38 event = Event(
39 event_type=event_type,
40 payload=payload,
41 sequence=self._current_sequence,
42 )
43 self._events.append(event)
44 return event
45
46 def replay(self, from_sequence: int = 0) -> list[Event]:
47 """Replay events starting from a given sequence number."""
48 with self._lock:
49 return [e for e in self._events if e.sequence > from_sequence]
50
51 def get_latest_sequence(self) -> int:
52 with self._lock:
53 return self._current_sequence
54
55 def size(self) -> int:
56 with self._lock:
57 return len(self._events)
58
59
60 # --------------- Composite Pattern: transform pipeline ---------------
61
62 class Transform(ABC):
63 """Base interface for event transformations."""
64
65 @abstractmethod
66 def apply(self, events: list[Event]) -> list[Event]: ...
67
68
69 class FilterTransform(Transform):
70 """Filters events based on a predicate."""
71
72 def __init__(self, predicate: Callable[[Event], bool], name: str = "Filter"):
73 self._predicate = predicate
74 self._name = name
75
76 def apply(self, events: list[Event]) -> list[Event]:
77 result = [e for e in events if self._predicate(e)]
78 print(f" [{self._name}] {len(events)} in -> {len(result)} out")
79 return result
80
81
82 class MapTransform(Transform):
83 """Maps events by transforming their payloads."""
84
85 def __init__(self, mapper: Callable[[Event], Event], name: str = "Map"):
86 self._mapper = mapper
87 self._name = name
88
89 def apply(self, events: list[Event]) -> list[Event]:
90 result = [self._mapper(e) for e in events]
91 print(f" [{self._name}] mapped {len(result)} events")
92 return result
93
94
95 class BatchTransform(Transform):
96 """Groups events into batches of a given size."""
97
98 def __init__(self, batch_size: int):
99 self._batch_size = batch_size
100
101 def apply(self, events: list[Event]) -> list[Event]:
102 # Flatten pass-through; in production this would yield batches
103 batches = [events[i:i + self._batch_size]
104 for i in range(0, len(events), self._batch_size)]
105 print(f" [Batch] {len(events)} events -> {len(batches)} batches of <={self._batch_size}")
106 # Return flat list for downstream compatibility
107 return events
108
109
110 class CompositePipeline(Transform):
111 """Composite: chains multiple transforms. Each transform's output feeds the next."""
112
113 def __init__(self, name: str = "Pipeline") -> None:
114 self._transforms: list[Transform] = []
115 self._name = name
116
117 def add(self, transform: Transform) -> CompositePipeline:
118 self._transforms.append(transform)
119 return self
120
121 def apply(self, events: list[Event]) -> list[Event]:
122 print(f" [{self._name}] Starting pipeline with {len(events)} events")
123 result = events
124 for transform in self._transforms:
125 result = transform.apply(result)
126 if not result:
127 break
128 print(f" [{self._name}] Pipeline complete: {len(result)} events out")
129 return result
130
131
132 # --------------- Strategy Pattern: delivery guarantees ---------------
133
134 class DeliveryStrategy(ABC):
135 """Defines how events are delivered to subscribers."""
136
137 @abstractmethod
138 def deliver(self, event: Event, subscriber: EventSubscriber) -> bool: ...
139
140
141 class AtLeastOnceStrategy(DeliveryStrategy):
142 """Retries on failure. May deliver duplicates."""
143
144 def __init__(self, max_retries: int = 3):
145 self._max_retries = max_retries
146
147 def deliver(self, event: Event, subscriber: EventSubscriber) -> bool:
148 for attempt in range(1, self._max_retries + 1):
149 success = subscriber.on_event(event)
150 if success:
151 return True
152 print(f" [AtLeastOnce] Retry {attempt}/{self._max_retries} for {subscriber.get_id()}")
153 print(f" [AtLeastOnce] Exhausted retries for {subscriber.get_id()}")
154 return False
155
156
157 class ExactlyOnceStrategy(DeliveryStrategy):
158 """Tracks delivered event sequences per subscriber to prevent duplicates."""
159
160 def __init__(self) -> None:
161 self._delivered: dict[str, set[int]] = {}
162
163 def deliver(self, event: Event, subscriber: EventSubscriber) -> bool:
164 sub_id = subscriber.get_id()
165 if sub_id not in self._delivered:
166 self._delivered[sub_id] = set()
167 if event.sequence in self._delivered[sub_id]:
168 print(f" [ExactlyOnce] Skipping duplicate seq={event.sequence} for {sub_id}")
169 return True
170 success = subscriber.on_event(event)
171 if success:
172 self._delivered[sub_id].add(event.sequence)
173 return success
174
175
176 # --------------- Observer Pattern: subscribers ---------------
177
178 class EventSubscriber(ABC):
179 """Observer that consumes events and tracks its own offset."""
180
181 @abstractmethod
182 def on_event(self, event: Event) -> bool: ...
183
184 @abstractmethod
185 def get_id(self) -> str: ...
186
187 @abstractmethod
188 def get_offset(self) -> int: ...
189
190
191 class LoggingSubscriber(EventSubscriber):
192 """Prints events and tracks offset."""
193
194 def __init__(self, name: str):
195 self._name = name
196 self._offset = 0
197 self._received: list[Event] = []
198
199 def on_event(self, event: Event) -> bool:
200 self._received.append(event)
201 self._offset = event.sequence
202 print(f" [{self._name}] Consumed: {event}")
203 return True
204
205 def get_id(self) -> str:
206 return self._name
207
208 def get_offset(self) -> int:
209 return self._offset
210
211 @property
212 def received_count(self) -> int:
213 return len(self._received)
214
215
216 # --------------- Mediator Pattern: event mediator ---------------
217
218 class EventMediator:
219 """Mediator coordinating producers, transforms, and consumers.
220 Producers and consumers never reference each other."""
221
222 def __init__(self, store: EventStore) -> None:
223 self._store = store
224 self._producers: set[str] = set()
225 self._subscribers: dict[str, list[EventSubscriber]] = {}
226 self._pipelines: dict[str, CompositePipeline] = {}
227 self._strategies: dict[str, DeliveryStrategy] = {}
228 self._default_strategy: DeliveryStrategy = AtLeastOnceStrategy()
229 self._buffer: list[Event] = []
230 self._lock = threading.Lock()
231
232 def register_producer(self, name: str) -> None:
233 self._producers.add(name)
234 print(f" [Mediator] Producer registered: {name}")
235
236 def register_consumer(
237 self,
238 stream: str,
239 subscriber: EventSubscriber,
240 strategy: DeliveryStrategy | None = None,
241 ) -> None:
242 if stream not in self._subscribers:
243 self._subscribers[stream] = []
244 self._subscribers[stream].append(subscriber)
245 if strategy:
246 self._strategies[subscriber.get_id()] = strategy
247 print(f" [Mediator] Consumer '{subscriber.get_id()}' registered on stream '{stream}'")
248
249 def set_pipeline(self, stream: str, pipeline: CompositePipeline) -> None:
250 self._pipelines[stream] = pipeline
251
252 def publish(self, producer: str, event_type: str, payload: dict[str, Any]) -> Event:
253 """Producer fires an event. Mediator appends to store and buffers for routing."""
254 if producer not in self._producers:
255 raise ValueError(f"Unknown producer: {producer}")
256 event = self._store.append(event_type, payload)
257 with self._lock:
258 self._buffer.append(event)
259 return event
260
261 def route(self, stream: str) -> None:
262 """Route buffered events through optional pipeline, then deliver to subscribers."""
263 with self._lock:
264 events = list(self._buffer)
265 self._buffer.clear()
266
267 # Apply transform pipeline if configured
268 pipeline = self._pipelines.get(stream)
269 if pipeline:
270 events = pipeline.apply(events)
271
272 # Deliver to all subscribers on this stream
273 subscribers = self._subscribers.get(stream, [])
274 for event in events:
275 for sub in subscribers:
276 strategy = self._strategies.get(sub.get_id(), self._default_strategy)
277 strategy.deliver(event, sub)
278
279
280 # --------------- Demo ---------------
281
282 if __name__ == "__main__":
283 print("=" * 60)
284 print("Event Sourcing / CDC System Demo")
285 print("=" * 60)
286
287 # 1) Create the event store
288 store = EventStore()
289 mediator = EventMediator(store)
290
291 # 2) Register producers
292 print("\n--- Registering Producers ---")
293 mediator.register_producer("UserService")
294 mediator.register_producer("OrderService")
295 mediator.register_producer("PaymentService")
296
297 # 3) Create subscribers with different delivery strategies
298 print("\n--- Registering Consumers ---")
299 audit_log = LoggingSubscriber("AuditLog")
300 analytics = LoggingSubscriber("AnalyticsEngine")
301 search_indexer = LoggingSubscriber("SearchIndexer")
302
303 at_least_once = AtLeastOnceStrategy(max_retries=3)
304 exactly_once = ExactlyOnceStrategy()
305
306 mediator.register_consumer("all-events", audit_log, strategy=at_least_once)
307 mediator.register_consumer("all-events", analytics, strategy=exactly_once)
308 mediator.register_consumer("filtered-events", search_indexer, strategy=exactly_once)
309
310 # 4) Build a composite transform pipeline for the filtered stream
311 print("\n--- Building Transform Pipeline ---")
312 pipeline = CompositePipeline("CDC-Pipeline")
313 pipeline.add(FilterTransform(
314 predicate=lambda e: e.event_type in ("USER_CREATED", "ORDER_PLACED"),
315 name="TypeFilter",
316 ))
317 pipeline.add(MapTransform(
318 mapper=lambda e: Event(
319 event_type=e.event_type,
320 payload={**e.payload, "enriched": True},
321 timestamp=e.timestamp,
322 sequence=e.sequence,
323 ),
324 name="EnrichMapper",
325 ))
326 pipeline.add(BatchTransform(batch_size=2))
327 mediator.set_pipeline("filtered-events", pipeline)
328
329 # 5) Publish events from different producers
330 print("\n--- Publishing Events ---")
331 e1 = mediator.publish("UserService", "USER_CREATED", {"user_id": "u-101", "name": "Alice"})
332 print(f" Published: {e1}")
333
334 e2 = mediator.publish("OrderService", "ORDER_PLACED", {"order_id": "o-501", "amount": 99.99})
335 print(f" Published: {e2}")
336
337 e3 = mediator.publish("PaymentService", "PAYMENT_PROCESSED", {"payment_id": "p-301", "status": "success"})
338 print(f" Published: {e3}")
339
340 # 6) Route through the "all-events" stream (no pipeline, raw delivery)
341 print("\n--- Routing: all-events (raw, no transforms) ---")
342 mediator.route("all-events")
343
344 # 7) Re-publish so buffer has events for the filtered stream
345 e4 = mediator.publish("UserService", "USER_CREATED", {"user_id": "u-102", "name": "Bob"})
346 e5 = mediator.publish("OrderService", "ORDER_PLACED", {"order_id": "o-502", "amount": 49.50})
347 e6 = mediator.publish("PaymentService", "PAYMENT_PROCESSED", {"payment_id": "p-302", "status": "failed"})
348
349 print("\n--- Routing: filtered-events (with pipeline) ---")
350 mediator.route("filtered-events")
351
352 # 8) Demonstrate exactly-once: attempt duplicate delivery
353 print("\n--- Exactly-Once Dedup Test ---")
354 exactly_once_strategy = ExactlyOnceStrategy()
355 dup_sub = LoggingSubscriber("DedupTest")
356 test_event = store.replay(0)[0]
357 exactly_once_strategy.deliver(test_event, dup_sub)
358 exactly_once_strategy.deliver(test_event, dup_sub) # should be skipped
359
360 # 9) Replay from event store
361 print("\n--- Replaying Events from Store ---")
362 print(f" Store contains {store.size()} events")
363 replayed = store.replay(from_sequence=3)
364 print(f" Replaying from sequence 3: {len(replayed)} events")
365 for ev in replayed:
366 print(f" {ev}")
367
368 # 10) Summary
369 print("\n--- Delivery Summary ---")
370 print(f" AuditLog consumed: {audit_log.received_count} events (at-least-once)")
371 print(f" AnalyticsEngine consumed: {analytics.received_count} events (exactly-once)")
372 print(f" SearchIndexer consumed: {search_indexer.received_count} events (exactly-once, filtered)")
373 print(f" Event store total: {store.size()} events")
374 print(f" Latest sequence: {store.get_latest_sequence()}")
375
376 print("\n=== Demo Complete ===")Common Mistakes
- ✗Mutable events: defeats the entire event sourcing model. Events must be append-only.
- ✗No sequence numbers: you can't detect gaps or duplicates without ordering
- ✗Coupling producers to consumers: producers should fire-and-forget to the mediator
- ✗Ignoring back-pressure: fast producers can overwhelm slow consumers
Key Points
- ✓Command: every state change is an immutable event object (InsertEvent, UpdateEvent, DeleteEvent). Events are replayable.
- ✓Mediator: producers and consumers never know about each other. The mediator routes, buffers, and manages back-pressure.
- ✓Composite: transformation pipelines compose. A FilterTransform wrapping a MapTransform wrapping a BatchTransform.
- ✓Strategy: delivery guarantee is swappable. The same event stream can be consumed at-least-once by one subscriber and exactly-once by another.