Event Sourcing System
Immutable event log with Command-based changes, Mediator coordinating sources and sinks, Composite transformation pipelines, and swappable delivery strategies.
Key Abstractions
Command — immutable data change with type, payload, timestamp, sequence number
Append-only log of events
Mediator — coordinates between producers, transformers, and consumers
Composite — Filter inside Map inside Batch; complex transforms are trees of simpler ones
Strategy — at-least-once, at-most-once, exactly-once semantics
Observer — consumes events, tracks offset
Class Diagram
How It Works
Event sourcing treats every state change as an immutable event appended to a log. Instead of overwriting a row in a database, you record "user created", "order placed", "payment processed" as discrete facts. The current state of any entity is derived by replaying its events from the beginning. The append-only model gives you a complete audit trail, the ability to rebuild state at any point in time, and natural decoupling between writers and readers.
The EventStore is an ordered, append-only sequence of events. Each event gets a monotonically increasing sequence number. Producers write to the store through an EventMediator that acts as the single coordination point. Producers never know about consumers, and consumers never know about producers. The mediator handles routing, buffering, and back-pressure management.
Change Data Capture (CDC) layers cleanly on top of event sourcing. Instead of polling a database for changes, consumers subscribe to the event stream and receive changes as they happen. A CompositePipeline sits between the raw event stream and subscribers, filtering out irrelevant events, enriching payloads, and batching for efficiency. Because the pipeline is a Composite, you build complex transformations by composing simple ones: a FilterTransform feeding a MapTransform feeding a BatchTransform.
Different consumers need different delivery guarantees. An audit log needs at-least-once delivery with retries so no event is ever lost. An analytics engine needs exactly-once delivery to avoid double-counting. The DeliveryStrategy abstraction makes this configurable per subscriber: the same event stream can serve both consumers with different semantics.
Requirements
Functional
- Append immutable events to an ordered, durable event store with sequence numbers
- Replay events from any sequence number to rebuild state
- Route events from producers to consumers through a mediator without direct coupling
- Support composable transformation pipelines: filter, map, batch
- Deliver events with configurable guarantees: at-least-once, exactly-once
Non-Functional
- Thread-safe event store supporting concurrent appends
- Sequence numbers must be gap-free for consumers to detect missing events
- Transformation pipelines must be composable and extensible without modifying existing transforms
- Delivery strategies must be swappable per subscriber without affecting the routing layer
- Back-pressure must prevent fast producers from overwhelming slow consumers
Design Decisions
What do you lose by just updating rows in place?
When you overwrite a row, the previous value is gone. Immutable events preserve the full timeline: what changed, when, and in what order. You can rebuild state at any point by replaying events up to that moment. That's invaluable for debugging, auditing, and retroactive analytics. The trade-off is storage and replay cost, which snapshots mitigate.
Couldn't producers just publish directly to topics?
Direct pub/sub couples producers to topic structures and consumers to subscription management. A Mediator centralizes routing logic, so you can add buffering, back-pressure, transform pipelines, and dead-letter handling in one place. Producers call publish() and walk away. The mediator decides how, when, and to whom events are delivered. For CDC, the same event often needs different transformations for different downstream systems, and the mediator is where that logic lives.
Why not just hardcode the filter-map-batch sequence?
Each transformation (filter, map, batch) is simple on its own. Real pipelines combine them: filter out irrelevant event types, enrich the payload with metadata, batch for bulk insertion. Composite lets you build these by stacking transforms. Need a new enrichment step? Write a MapTransform and add it. The pipeline doesn't care about the internal structure of its children.
How do you pick between at-least-once and exactly-once?
At-least-once is the simplest: retry on failure, but the consumer must be idempotent because it may see duplicates. Exactly-once requires tracking which events have been delivered to each subscriber, adding storage and coordination overhead. At-most-once delivers once with no retry: events can be lost but there's no duplication. Strategy lets each subscriber choose independently based on its tolerance for duplicates versus data loss.
Interview Follow-ups
- "How would you add snapshotting for fast replay?" Periodically serialize the current state derived from events into a snapshot. On replay, load the latest snapshot and replay only events after the snapshot's sequence number. This bounds replay time regardless of total event count. Schedule snapshots based on event count thresholds or time intervals.
- "How does CQRS fit with event sourcing?" CQRS separates the write model (command side appending events) from the read model (query side built by projecting events into denormalized views). The event store is the write side. Subscribers maintain read-optimized projections: one might build a search index, another a materialized view for dashboards. Each projection consumes the same event stream but structures data differently.
- "How do you handle schema evolution?" Version your events. An
ORDER_PLACED_V2event might add acurrencyfield. Write upcasters that transform V1 events to V2 format during replay. Never modify existing event schemas in place. That breaks immutability. Consumer-side upcasting keeps the event store stable while allowing the domain model to evolve. - "How do you order events in a distributed system?" Use a global sequence number from a centralized sequencer (e.g., a database auto-increment or a Snowflake ID generator). For partitioned event stores, maintain per-partition ordering and use vector clocks or hybrid logical clocks for cross-partition causal ordering. Accept that total ordering across partitions is expensive and often unnecessary. Causal ordering within an aggregate is usually sufficient.
45-Minute LLD Playbook
Each phase is what the interviewer expects you to do and say. Concrete steps, not topic hints. Diagrams are what you would sketch on the board.
- 15 min
Clarify the scope and lock the event model
GoalPin down what an event is, what the store guarantees, and what delivery semantics consumers need. End with the diagram-vs-code question.
Do & Say- ASK·1Open with: An event in this system is immutable: event_type, payload, timestamp, and a sequence number assigned by the store on append. The payload is opaque to the mediator. Is that the contract you have in mind? Lock immutability and gap-free sequence numbers as hard invariants.
- SAY·2Confirm the store semantics: Append-only log, no updates, no deletes. Replay from any sequence number rebuilds state. I will treat snapshotting as a v2 optimisation, not v1. Park snapshots, compaction, and schema versioning as deferred.
- SAY·3Pin the delivery menu: I will support at-least-once with retries and exactly-once with per-subscriber dedup. At-most-once and exactly-once-across-partitions are out of scope.
- SAY·4Confirm the CDC angle: Consumers can subscribe with a transform pipeline (filter, map, batch). The pipeline is composable so SearchIndexer can have a different stack from AuditLog. Both read the same source events.
- ASK·5Ask the process question: Do you want a class diagram first, or jump into code with the structure showing through Event, EventStore, Transform, DeliveryStrategy, EventSubscriber, EventMediator?. Either way I want to budget the 40 minutes deliberately.
Interviewer is grading: You name immutability and gap-free sequence numbers as invariants before any code. You park snapshots and schema evolution as v2 instead of letting them creep in. You ask the diagram-vs-code question unprompted.
- 25-10 min
Sketch the six clusters and the data flow
GoalLock the six conceptual clusters (Event, EventStore, Transform, DeliveryStrategy, EventSubscriber, EventMediator) and trace one publish-to-deliver path before coding.
Do & Say- SAY·1Name the six clusters: Event (Command, immutable), EventStore (append-only log), Transform interface plus FilterTransform/MapTransform/BatchTransform/CompositePipeline (Composite), DeliveryStrategy interface plus AtLeastOnceStrategy/ExactlyOnceStrategy (Strategy), EventSubscriber interface plus LoggingSubscriber (Observer), EventMediator (the coordinator).
- SAY·2Trace one publish path out loud, part one: UserService calls mediator.publish. The mediator appends to EventStore, which assigns seq=1. The mediator buffers the returned event.
- SAY·3Publish path part two: When route(stream) fires, the mediator pulls the buffer. Optionally runs it through a CompositePipeline. Then for each subscriber looks up its DeliveryStrategy and calls strategy.deliver(event, sub).
- WRITE·4Write the EventMediator signatures: register_producer(name), register_consumer(stream, subscriber, strategy), set_pipeline(stream, pipeline), publish(producer, event_type, payload) -> Event, route(stream). Say: Producers and consumers never reference each other, only the mediator. That is the whole point.
- SAY·5Pin the pipeline composition: CompositePipeline.add returns self so you can chain pipeline.add(FilterTransform(...)).add(MapTransform(...)).add(BatchTransform(...)). The mediator just calls pipeline.apply(events).
- SAY·6If a diagram was requested, draw it as a vertical pipe: producers on the left feeding the mediator, mediator wrapping EventStore at the bottom, pipeline sitting between buffer and subscribers, strategies as the last hop before each consumer. Otherwise verbalise the same shape in 90 seconds.
Interviewer is grading: Method signatures on the mediator are complete before code starts. You commit to the buffer-then-route model rather than fire-and-forget direct delivery. You name the pipeline as a Composite that the mediator does not look inside.
- 325 min
Code in this sequence (bottom-up)
GoalType the code in dependency order: Event, EventStore, Transform hierarchy, DeliveryStrategy hierarchy, EventSubscriber, then EventMediator last. Match the existing pythonCode order exactly.
Do & Say- SAY·1Start with Event as a frozen dataclass. Fields: event_type, payload (dict), timestamp (default factory time.time), sequence (default 0). Add __str__ for nice logging. Say: frozen=True is the Command Pattern's immutability invariant. Once the store assigns a sequence, the event never changes. (~2 min)
- SAY·2Code EventStore fields: _events list, _current_sequence int, _lock for thread safety. append(event_type, payload) increments _current_sequence inside the lock, constructs the Event with that sequence, appends, returns. (~2 min)
- SAY·3Add replay(from_sequence=0) returning events with seq > from_sequence, plus get_latest_sequence and size. Say: The lock around append is the gap-free monotonic sequence invariant. Without it two concurrent appends could both read 5 and both assign 6. (~2 min)
- SAY·4Code the Transform leaves. Abstract Transform with apply(events). FilterTransform takes a predicate, MapTransform takes a mapper, BatchTransform takes batch_size and logs the grouping (passes through flat for downstream compatibility). (~2 min)
- SAY·5Code CompositePipeline. Holds a transforms list. add(transform) returns self for chaining. apply iterates each transform feeding the previous output, breaks early if a stage returns empty. (~2 min)
- SAY·6Say: CompositePipeline implements Transform itself. That is Composite. From the mediator's view there is no difference between one transform and a pipeline of ten. (~1 min)
- SAY·7Code DeliveryStrategy abstract base with deliver(event, subscriber). Then AtLeastOnceStrategy: takes max_retries, loops calling subscriber.on_event up to max_retries, returns True on first success or False after exhaustion. (~2 min)
- SAY·8Code ExactlyOnceStrategy: holds a per-subscriber set of delivered sequence numbers, skips if already delivered, otherwise calls on_event and records the sequence on success. (~2 min)
- SAY·9Say: ExactlyOnce keeps per-subscriber state, not global. AuditLog and AnalyticsEngine can be at different offsets in the same stream. (~1 min)
- SAY·10Code EventSubscriber abstract with on_event, get_id, get_offset. Then LoggingSubscriber: holds name, offset (int), and a received list. on_event appends the event, updates offset to event.sequence, prints, returns True. (~2 min)
- SAY·11Say: The subscriber owns its own offset. The mediator never tells it where it is. That is what makes replay-from-offset trivial. (~1 min)
- SAY·12Code EventMediator fields: _store, _producers set, _subscribers dict stream -> list, _pipelines dict stream -> CompositePipeline, _strategies dict subscriber_id -> DeliveryStrategy, _default_strategy (AtLeastOnceStrategy), _buffer list, _lock. (~1 min)
- SAY·13Code register_producer, register_consumer, set_pipeline as short setters. Then publish: validate the producer is registered, call store.append, buffer the returned event. (~2 min)
- SAY·14Code route(stream): drain the buffer under the lock, run through the pipeline if configured, then for each event and each subscriber on that stream look up the strategy and call deliver. (~1 min)
- SAY·15Say: publish and route are decoupled. Producers fire and walk away. route is what the consumer-side scheduler calls. (~1 min)
- SAY·16Walk-through, publish: UserService.publish(USER_CREATED, {user_id: u-101}). Store assigns seq=1, mediator buffers. OrderService.publish(ORDER_PLACED, {amount: 99.99}). Store assigns seq=2. (~1 min)
- SAY·17Walk-through, route: mediator.route(filtered-events) drains both, runs FilterTransform (both pass), MapTransform adds enriched=True, BatchTransform logs 2 events in 1 batch. For SearchIndexer, ExactlyOnceStrategy.deliver checks the empty delivered set, calls on_event for seq=1 then seq=2.
- SAY·18Walk-through, idempotence check: If route fires again with no new events, the pipeline runs on an empty list and nothing happens. Self-test passes.
Interviewer is grading: Code compiles as you type. You name the store's lock as the gap-free-sequence invariant. You volunteer that ExactlyOnceStrategy holds per-subscriber state without prompting. You walk through one publish-to-deliver path as a self-check before declaring done.
- 45 min
Trade-offs and extensions
GoalDefend the mediator over direct pub/sub, defend Composite over a hardcoded filter-map-batch sequence, volunteer snapshotting and CQRS, close with a one-sentence summary.
Do & Say- SAY·1Trade-off one, Mediator over direct pub/sub: Without a mediator, producers know topic names and consumers know producer schemas. Every routing change touches both sides.
- SAY·2Defend the mediator: It centralises buffering, back-pressure, transform pipelines, and dead-letter handling. Producers just call publish. One more class in the hot path, worth it for the routing flexibility.
- SAY·3Trade-off two, Composite over hardcoded pipeline: A hardcoded filter-map-batch sequence works for the first use case. The second pipeline that needs map-filter-batch in a different order forces a copy of the routing logic. CompositePipeline lets each stream configure its own stack at registration time. The mediator stays oblivious.
- SAY·4Extension to volunteer, snapshotting: Periodically serialise the projected state at sequence N. On replay, load the snapshot and replay from N+1. Bounds replay time regardless of total event count.
- SAY·5Defend snapshot cadence: Schedule by event count or time. More frequent snapshots cost storage but shorten worst-case replay.
- SAY·6Extension to volunteer, CQRS: The write side is the EventStore. Each consumer maintains a read-optimised projection: SearchIndexer builds an Elasticsearch index, AnalyticsEngine populates a materialised view. Same events, different projections. The mediator is the natural fan-out point.
- WATCH·7Be ready for schema evolution: Version events (ORDER_PLACED_V1, ORDER_PLACED_V2). Write upcasters that lift V1 events to V2 during replay. Never edit existing event payloads in place. Immutability is the whole foundation.
- SAY·8Close with one sentence: Command for immutable events. Mediator for producer-consumer decoupling. Composite for transform pipelines. Strategy for at-least-once versus exactly-once per subscriber. Observer for offset-tracking consumers. Six clusters, gap-free sequence numbers, replayable from any offset.
Interviewer is grading: You name two specific trade-offs and defend them with the failure mode the alternative produces. You volunteer snapshotting and CQRS unprompted. You can summarise the six clusters in one breath.
Code Implementation
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable
import threading
import time
# --------------- Command Pattern: immutable events ---------------
@dataclass(frozen=True)
class Event:
"""Immutable event : the fundamental unit of state change."""
event_type: str
payload: dict[str, Any]
timestamp: float = field(default_factory=time.time)
sequence: int = 0
def __str__(self) -> str:
return f"Event(seq={self.sequence}, type={self.event_type}, payload={self.payload})"
# --------------- Event Store: append-only log ---------------
class EventStore:
"""Append-only log. Events are never mutated or deleted."""
def __init__(self) -> None:
self._events: list[Event] = []
self._current_sequence = 0
self._lock = threading.Lock()
def append(self, event_type: str, payload: dict[str, Any]) -> Event:
with self._lock:
self._current_sequence += 1
# Create new event with assigned sequence number
event = Event(
event_type=event_type,
payload=payload,
sequence=self._current_sequence,
)
self._events.append(event)
return event
def replay(self, from_sequence: int = 0) -> list[Event]:
"""Replay events starting from a given sequence number."""
with self._lock:
return [e for e in self._events if e.sequence > from_sequence]
def get_latest_sequence(self) -> int:
with self._lock:
return self._current_sequence
def size(self) -> int:
with self._lock:
return len(self._events)
# --------------- Composite Pattern: transform pipeline ---------------
class Transform(ABC):
"""Base interface for event transformations."""
@abstractmethod
def apply(self, events: list[Event]) -> list[Event]: ...
class FilterTransform(Transform):
"""Filters events based on a predicate."""
def __init__(self, predicate: Callable[[Event], bool], name: str = "Filter"):
self._predicate = predicate
self._name = name
def apply(self, events: list[Event]) -> list[Event]:
result = [e for e in events if self._predicate(e)]
print(f" [{self._name}] {len(events)} in -> {len(result)} out")
return result
class MapTransform(Transform):
"""Maps events by transforming their payloads."""
def __init__(self, mapper: Callable[[Event], Event], name: str = "Map"):
self._mapper = mapper
self._name = name
def apply(self, events: list[Event]) -> list[Event]:
result = [self._mapper(e) for e in events]
print(f" [{self._name}] mapped {len(result)} events")
return result
class BatchTransform(Transform):
"""Groups events into batches of a given size."""
def __init__(self, batch_size: int):
self._batch_size = batch_size
def apply(self, events: list[Event]) -> list[Event]:
# Flatten pass-through; in production this would yield batches
batches = [events[i:i + self._batch_size]
for i in range(0, len(events), self._batch_size)]
print(f" [Batch] {len(events)} events -> {len(batches)} batches of <={self._batch_size}")
# Return flat list for downstream compatibility
return events
class CompositePipeline(Transform):
"""Composite: chains multiple transforms. Each transform's output feeds the next."""
def __init__(self, name: str = "Pipeline") -> None:
self._transforms: list[Transform] = []
self._name = name
def add(self, transform: Transform) -> CompositePipeline:
self._transforms.append(transform)
return self
def apply(self, events: list[Event]) -> list[Event]:
print(f" [{self._name}] Starting pipeline with {len(events)} events")
result = events
for transform in self._transforms:
result = transform.apply(result)
if not result:
break
print(f" [{self._name}] Pipeline complete: {len(result)} events out")
return result
# --------------- Strategy Pattern: delivery guarantees ---------------
class DeliveryStrategy(ABC):
"""Defines how events are delivered to subscribers."""
@abstractmethod
def deliver(self, event: Event, subscriber: EventSubscriber) -> bool: ...
class AtLeastOnceStrategy(DeliveryStrategy):
"""Retries on failure. May deliver duplicates."""
def __init__(self, max_retries: int = 3):
self._max_retries = max_retries
def deliver(self, event: Event, subscriber: EventSubscriber) -> bool:
for attempt in range(1, self._max_retries + 1):
success = subscriber.on_event(event)
if success:
return True
print(f" [AtLeastOnce] Retry {attempt}/{self._max_retries} for {subscriber.get_id()}")
print(f" [AtLeastOnce] Exhausted retries for {subscriber.get_id()}")
return False
class ExactlyOnceStrategy(DeliveryStrategy):
"""Tracks delivered event sequences per subscriber to prevent duplicates."""
def __init__(self) -> None:
self._delivered: dict[str, set[int]] = {}
def deliver(self, event: Event, subscriber: EventSubscriber) -> bool:
sub_id = subscriber.get_id()
if sub_id not in self._delivered:
self._delivered[sub_id] = set()
if event.sequence in self._delivered[sub_id]:
print(f" [ExactlyOnce] Skipping duplicate seq={event.sequence} for {sub_id}")
return True
success = subscriber.on_event(event)
if success:
self._delivered[sub_id].add(event.sequence)
return success
# --------------- Observer Pattern: subscribers ---------------
class EventSubscriber(ABC):
"""Observer that consumes events and tracks its own offset."""
@abstractmethod
def on_event(self, event: Event) -> bool: ...
@abstractmethod
def get_id(self) -> str: ...
@abstractmethod
def get_offset(self) -> int: ...
class LoggingSubscriber(EventSubscriber):
"""Prints events and tracks offset."""
def __init__(self, name: str):
self._name = name
self._offset = 0
self._received: list[Event] = []
def on_event(self, event: Event) -> bool:
self._received.append(event)
self._offset = event.sequence
print(f" [{self._name}] Consumed: {event}")
return True
def get_id(self) -> str:
return self._name
def get_offset(self) -> int:
return self._offset
@property
def received_count(self) -> int:
return len(self._received)
# --------------- Mediator Pattern: event mediator ---------------
class EventMediator:
"""Mediator coordinating producers, transforms, and consumers.
Producers and consumers never reference each other."""
def __init__(self, store: EventStore) -> None:
self._store = store
self._producers: set[str] = set()
self._subscribers: dict[str, list[EventSubscriber]] = {}
self._pipelines: dict[str, CompositePipeline] = {}
self._strategies: dict[str, DeliveryStrategy] = {}
self._default_strategy: DeliveryStrategy = AtLeastOnceStrategy()
self._buffer: list[Event] = []
self._lock = threading.Lock()
def register_producer(self, name: str) -> None:
self._producers.add(name)
print(f" [Mediator] Producer registered: {name}")
def register_consumer(
self,
stream: str,
subscriber: EventSubscriber,
strategy: DeliveryStrategy | None = None,
) -> None:
if stream not in self._subscribers:
self._subscribers[stream] = []
self._subscribers[stream].append(subscriber)
if strategy:
self._strategies[subscriber.get_id()] = strategy
print(f" [Mediator] Consumer '{subscriber.get_id()}' registered on stream '{stream}'")
def set_pipeline(self, stream: str, pipeline: CompositePipeline) -> None:
self._pipelines[stream] = pipeline
def publish(self, producer: str, event_type: str, payload: dict[str, Any]) -> Event:
"""Producer fires an event. Mediator appends to store and buffers for routing."""
if producer not in self._producers:
raise ValueError(f"Unknown producer: {producer}")
event = self._store.append(event_type, payload)
with self._lock:
self._buffer.append(event)
return event
def route(self, stream: str) -> None:
"""Route buffered events through optional pipeline, then deliver to subscribers."""
with self._lock:
events = list(self._buffer)
self._buffer.clear()
# Apply transform pipeline if configured
pipeline = self._pipelines.get(stream)
if pipeline:
events = pipeline.apply(events)
# Deliver to all subscribers on this stream
subscribers = self._subscribers.get(stream, [])
for event in events:
for sub in subscribers:
strategy = self._strategies.get(sub.get_id(), self._default_strategy)
strategy.deliver(event, sub)
# --------------- Demo ---------------
if __name__ == "__main__":
print("=" * 60)
print("Event Sourcing / CDC System Demo")
print("=" * 60)
# 1) Create the event store
store = EventStore()
mediator = EventMediator(store)
# 2) Register producers
print("\n--- Registering Producers ---")
mediator.register_producer("UserService")
mediator.register_producer("OrderService")
mediator.register_producer("PaymentService")
# 3) Create subscribers with different delivery strategies
print("\n--- Registering Consumers ---")
audit_log = LoggingSubscriber("AuditLog")
analytics = LoggingSubscriber("AnalyticsEngine")
search_indexer = LoggingSubscriber("SearchIndexer")
at_least_once = AtLeastOnceStrategy(max_retries=3)
exactly_once = ExactlyOnceStrategy()
mediator.register_consumer("all-events", audit_log, strategy=at_least_once)
mediator.register_consumer("all-events", analytics, strategy=exactly_once)
mediator.register_consumer("filtered-events", search_indexer, strategy=exactly_once)
# 4) Build a composite transform pipeline for the filtered stream
print("\n--- Building Transform Pipeline ---")
pipeline = CompositePipeline("CDC-Pipeline")
pipeline.add(FilterTransform(
predicate=lambda e: e.event_type in ("USER_CREATED", "ORDER_PLACED"),
name="TypeFilter",
))
pipeline.add(MapTransform(
mapper=lambda e: Event(
event_type=e.event_type,
payload={**e.payload, "enriched": True},
timestamp=e.timestamp,
sequence=e.sequence,
),
name="EnrichMapper",
))
pipeline.add(BatchTransform(batch_size=2))
mediator.set_pipeline("filtered-events", pipeline)
# 5) Publish events from different producers
print("\n--- Publishing Events ---")
e1 = mediator.publish("UserService", "USER_CREATED", {"user_id": "u-101", "name": "Alice"})
print(f" Published: {e1}")
e2 = mediator.publish("OrderService", "ORDER_PLACED", {"order_id": "o-501", "amount": 99.99})
print(f" Published: {e2}")
e3 = mediator.publish("PaymentService", "PAYMENT_PROCESSED", {"payment_id": "p-301", "status": "success"})
print(f" Published: {e3}")
# 6) Route through the "all-events" stream (no pipeline, raw delivery)
print("\n--- Routing: all-events (raw, no transforms) ---")
mediator.route("all-events")
# 7) Re-publish so buffer has events for the filtered stream
e4 = mediator.publish("UserService", "USER_CREATED", {"user_id": "u-102", "name": "Bob"})
e5 = mediator.publish("OrderService", "ORDER_PLACED", {"order_id": "o-502", "amount": 49.50})
e6 = mediator.publish("PaymentService", "PAYMENT_PROCESSED", {"payment_id": "p-302", "status": "failed"})
print("\n--- Routing: filtered-events (with pipeline) ---")
mediator.route("filtered-events")
# 8) Demonstrate exactly-once: attempt duplicate delivery
print("\n--- Exactly-Once Dedup Test ---")
exactly_once_strategy = ExactlyOnceStrategy()
dup_sub = LoggingSubscriber("DedupTest")
test_event = store.replay(0)[0]
exactly_once_strategy.deliver(test_event, dup_sub)
exactly_once_strategy.deliver(test_event, dup_sub) # should be skipped
# 9) Replay from event store
print("\n--- Replaying Events from Store ---")
print(f" Store contains {store.size()} events")
replayed = store.replay(from_sequence=3)
print(f" Replaying from sequence 3: {len(replayed)} events")
for ev in replayed:
print(f" {ev}")
# 10) Summary
print("\n--- Delivery Summary ---")
print(f" AuditLog consumed: {audit_log.received_count} events (at-least-once)")
print(f" AnalyticsEngine consumed: {analytics.received_count} events (exactly-once)")
print(f" SearchIndexer consumed: {search_indexer.received_count} events (exactly-once, filtered)")
print(f" Event store total: {store.size()} events")
print(f" Latest sequence: {store.get_latest_sequence()}")
print("\n=== Demo Complete ===")Interview Grading by Level
What an interviewer at each level expects to see in your answer. Use this to calibrate, not to perform.
Junior Engineer (L3)
Builds an event list and a publish-subscribe loop, but couples producers to consumers and treats the event log as an after-the-fact append.
- Implements Event as a frozen dataclass with event_type and payload.
- Builds an EventStore that appends events and assigns sequence numbers.
- Recognises that replay from a sequence number rebuilds state.
- Writes a basic subscriber loop that calls on_event for each new event.
- When prompted, can explain why mutating events would break replay.
- Lets producers call subscribers directly without a mediator, coupling the two sides.
- Forgets the lock around store.append, so concurrent publishes can produce duplicate or gapped sequence numbers.
- Treats the filter-map-batch sequence as one hardcoded function instead of three composable Transforms.
- Implements one delivery semantics (usually at-most-once silently) and assumes consumers handle dedup.
- Stores subscriber offsets on the mediator rather than on the subscriber itself.
Mid-Level Engineer (L4)
Drives the design with a real Mediator, gap-free sequencing, composable transforms, and per-subscriber DeliveryStrategy.
- Event is frozen, EventStore holds the lock around append for monotonic sequencing.
- Mediator buffers events on publish and drains on route, so producers fire-and-forget.
- FilterTransform, MapTransform, BatchTransform, and CompositePipeline all implement the Transform interface, and CompositePipeline.add chains.
- Two DeliveryStrategy concretes: AtLeastOnceStrategy with retry count, ExactlyOnceStrategy with per-subscriber sequence dedup.
- EventSubscriber owns its own offset; mediator never tracks subscriber position.
- Handles the dedup test (delivering the same event twice through ExactlyOnceStrategy) correctly.
- Does not volunteer snapshotting until prompted, even though replay-time is the obvious concern at scale.
- Misses CQRS as the natural framing for multiple projections off the same event stream.
- Treats schema evolution as a runtime concern instead of a versioning-plus-upcaster design choice.
Senior Engineer (L5+)
Volunteers snapshotting, CQRS, and schema versioning unprompted, names the immutability invariant as the foundation, and frames each pattern around the failure it prevents.
- Volunteers snapshotting with the snapshot-at-sequence-N plus replay-from-N+1 strategy and discusses snapshot frequency trade-offs.
- Volunteers CQRS framing: EventStore is the write side, each subscriber is a read-side projection (search index, materialised view, dashboard cache).
- Names the gap-free monotonic sequence number as the invariant that makes dedup, replay, and offset tracking all work; defends the lock around append as the only way to guarantee it.
- Frames each pattern around its failure: Command for replay correctness, Mediator for producer-consumer churn, Composite for pipeline variation per stream, Strategy for differing delivery tolerances per subscriber.
- Proposes upcasters for schema evolution and explicitly forbids editing existing event payloads.
- Suggests hybrid logical clocks or per-partition ordering for the distributed extension, with the caveat that total cross-partition ordering is usually unnecessary.
- Closes with a one-sentence summary that names all five patterns plus the gap-free-sequence invariant in under 20 seconds.
Common Mistakes
- ✗Mutable events: defeats the entire event sourcing model. Events must be append-only.
- ✗No sequence numbers: you can't detect gaps or duplicates without ordering
- ✗Coupling producers to consumers: producers should fire-and-forget to the mediator
- ✗Ignoring back-pressure: fast producers can overwhelm slow consumers
Key Points
- ✓Command: every state change is an immutable event object (InsertEvent, UpdateEvent, DeleteEvent). Events are replayable.
- ✓Mediator: producers and consumers never know about each other. The mediator routes, buffers, and manages back-pressure.
- ✓Composite: transformation pipelines compose. A FilterTransform wrapping a MapTransform wrapping a BatchTransform.
- ✓Strategy: delivery guarantee is swappable. The same event stream can be consumed at-least-once by one subscriber and exactly-once by another.