Pub/Sub System
Topic-based message broker with subscriber management. Observer pattern at its core, Strategy pattern for delivery semantics. Decouples producers from consumers completely.
Key Abstractions
Facade coordinating topic creation, subscriptions, and message routing
Named channel that holds a subscriber list and dispatches messages
Interface with an on_message callback. Any class can subscribe by implementing it.
Immutable payload with topic, content, timestamp, and unique ID for deduplication
Tracks topic-to-subscriber mappings, handles subscribe/unsubscribe lifecycle
Class Diagram
Why Topic-Based Routing
Content-based pub/sub sounds appealing on paper. Subscribers write filter expressions, and the broker evaluates each message against every filter. Flexible? Sure. But it creates a coupling between message structure and subscription logic that gets painful fast. Change a field name and half your subscriptions silently stop matching.
Topic-based routing is simpler and more predictable. Publishers send to a named topic. Subscribers attach to that topic. No parsing, no filter evaluation, no surprises. When you need finer granularity, you create more specific topics (e.g., "orders.created" vs "orders.shipped") rather than writing filter predicates. Most production systems like Kafka, SNS, and Google Cloud Pub/Sub default to topic-based for exactly this reason.
Requirements
Functional
- Create named topics for message routing
- Publishers send messages to specific topics
- Subscribers register callbacks on topics and receive all messages published there
- Unsubscribe removes a subscriber from future deliveries
- Each message carries a unique ID, topic name, content, and timestamp
Non-Functional
- Thread-safe for concurrent publish and subscribe operations
- Delivery strategy must be pluggable (sync vs async) without modifying broker internals
- Message deduplication prevents duplicate processing on retries
- One failing subscriber must not block delivery to others
Design Decisions
Why Subscriber as an interface, not a concrete class?
Every system that consumes messages looks different. An email service processes messages one way, a fraud detector another. If Subscriber were concrete, you would either stuff all variations into one class or maintain an inheritance tree that grows with every integration. An interface lets any class participate by implementing a single method. Add Slack notifications next month? Write a SlackSubscriber, subscribe it, done.
Why separate SubscriptionManager from Topic?
Topics know how to dispatch. Subscription management involves tracking mappings, handling lifecycle events, and potentially supporting patterns like wildcard subscriptions later. Mixing these into Topic means every new subscription feature requires editing the dispatch code. Separation lets each evolve independently.
Why message IDs for deduplication?
Network partitions, retries, and at-least-once delivery guarantees all produce duplicate messages. Without unique IDs, the broker has no way to recognize a redelivered message. Idempotent subscribers help, but deduplication at the broker layer catches duplicates before they ever reach subscriber code.
Why Strategy pattern for delivery?
Synchronous delivery is ideal for testing and debugging. You publish, subscribers process, everything happens in order on one thread. But in production, one slow subscriber (say, an email service waiting on SMTP) blocks every other subscriber on that topic. Async delivery through a thread pool isolates subscribers from each other. Making this a strategy means you switch between the two with a constructor argument, no code changes.
Interview Follow-ups
- "How would you add message persistence?" Store messages in a durable log (like Kafka's commit log) before delivery. Subscribers track their offset and can replay from any point.
- "What about dead letter queues?" After N failed delivery attempts, move the message to a DLQ topic. A separate consumer handles retries or alerts.
- "How would you support wildcard subscriptions?" Extend SubscriptionManager to match topic patterns (e.g., "orders.*"). On publish, resolve all matching subscriptions.
- "How do you scale this horizontally?" Partition topics across broker instances using consistent hashing. Each partition is an independent unit with its own subscriber set.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from enum import Enum
4 from dataclasses import dataclass, field
5 from datetime import datetime
6 from threading import Lock
7 from concurrent.futures import ThreadPoolExecutor
8 import uuid
9 from typing import Protocol
10
11
12 @dataclass(frozen=True)
13 class Message:
14 """Immutable message with unique ID for deduplication."""
15 topic_name: str
16 content: str
17 id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
18 timestamp: datetime = field(default_factory=datetime.now)
19
20 def __str__(self) -> str:
21 return f"[{self.topic_name}] {self.content} (id={self.id})"
22
23
24 class Subscriber(ABC):
25 """Any component that wants to receive messages implements this."""
26
27 @abstractmethod
28 def get_id(self) -> str: ...
29
30 @abstractmethod
31 def on_message(self, message: Message) -> None: ...
32
33 def __hash__(self) -> int:
34 return hash(self.get_id())
35
36 def __eq__(self, other: object) -> bool:
37 if not isinstance(other, Subscriber):
38 return NotImplemented
39 return self.get_id() == other.get_id()
40
41
42 class DeliveryStrategy(Protocol):
43 """Controls how messages reach subscribers. Sync for testing, async for production."""
44 def deliver(self, message: Message, subscribers: set[Subscriber]) -> None: ...
45
46
47 class SyncDeliveryStrategy:
48 """Delivers to each subscriber sequentially. Simple, predictable, good for testing."""
49
50 def deliver(self, message: Message, subscribers: set[Subscriber]) -> None:
51 for sub in subscribers:
52 try:
53 sub.on_message(message)
54 except Exception as e:
55 print(f"Delivery failed for {sub.get_id()}: {e}")
56
57
58 class AsyncDeliveryStrategy:
59 """Delivers via thread pool. One slow subscriber won't block the others."""
60
61 def __init__(self, max_workers: int = 4):
62 self._executor = ThreadPoolExecutor(max_workers=max_workers)
63
64 def deliver(self, message: Message, subscribers: set[Subscriber]) -> None:
65 for sub in subscribers:
66 self._executor.submit(self._safe_deliver, sub, message)
67
68 @staticmethod
69 def _safe_deliver(sub: Subscriber, message: Message) -> None:
70 try:
71 sub.on_message(message)
72 except Exception as e:
73 print(f"Async delivery failed for {sub.get_id()}: {e}")
74
75
76 class SubscriptionManager:
77 """Tracks who is subscribed to what. Separated from Topic so subscription
78 logic can evolve independently (e.g., adding wildcard subscriptions later)."""
79
80 def __init__(self):
81 self._topic_subs: dict[str, set[Subscriber]] = {}
82 self._lock = Lock()
83
84 def subscribe(self, topic_name: str, subscriber: Subscriber) -> None:
85 with self._lock:
86 if topic_name not in self._topic_subs:
87 self._topic_subs[topic_name] = set()
88 self._topic_subs[topic_name].add(subscriber)
89
90 def unsubscribe(self, topic_name: str, subscriber: Subscriber) -> None:
91 with self._lock:
92 subs = self._topic_subs.get(topic_name)
93 if subs:
94 subs.discard(subscriber)
95
96 def get_subscribers(self, topic_name: str) -> set[Subscriber]:
97 with self._lock:
98 return set(self._topic_subs.get(topic_name, set()))
99
100
101 class Topic:
102 """Named channel. Doesn't own subscribers directly. Delegates to SubscriptionManager
103 so that cross-cutting concerns like wildcard matching live in one place."""
104
105 def __init__(self, name: str):
106 self.name = name
107
108 def publish(self, message: Message, subscribers: set[Subscriber],
109 strategy: DeliveryStrategy) -> None:
110 strategy.deliver(message, subscribers)
111
112
113 class MessageBroker:
114 """Facade that ties everything together. Clients interact with this only."""
115
116 def __init__(self, strategy: DeliveryStrategy | None = None):
117 self._topics: dict[str, Topic] = {}
118 self._sub_manager = SubscriptionManager()
119 self._strategy = strategy or SyncDeliveryStrategy()
120 self._lock = Lock()
121 self._seen_ids: set[str] = set()
122
123 def create_topic(self, name: str) -> Topic:
124 with self._lock:
125 if name in self._topics:
126 return self._topics[name]
127 topic = Topic(name)
128 self._topics[name] = topic
129 return topic
130
131 def subscribe(self, topic_name: str, subscriber: Subscriber) -> None:
132 if topic_name not in self._topics:
133 raise ValueError(f"Topic '{topic_name}' does not exist")
134 self._sub_manager.subscribe(topic_name, subscriber)
135
136 def unsubscribe(self, topic_name: str, subscriber: Subscriber) -> None:
137 self._sub_manager.unsubscribe(topic_name, subscriber)
138
139 def publish(self, topic_name: str, content: str) -> Message:
140 with self._lock:
141 topic = self._topics.get(topic_name)
142 if topic is None:
143 raise ValueError(f"Topic '{topic_name}' does not exist")
144
145 message = Message(topic_name=topic_name, content=content)
146
147 # Deduplication check
148 with self._lock:
149 if message.id in self._seen_ids:
150 return message
151 self._seen_ids.add(message.id)
152
153 subscribers = self._sub_manager.get_subscribers(topic_name)
154 topic.publish(message, subscribers, self._strategy)
155 return message
156
157
158 # --- Concrete subscriber for demonstration ---
159
160 class LoggingSubscriber(Subscriber):
161 """Prints every message it receives. Useful for debugging and demos."""
162
163 def __init__(self, name: str):
164 self._name = name
165 self.received: list[Message] = []
166
167 def get_id(self) -> str:
168 return self._name
169
170 def on_message(self, message: Message) -> None:
171 self.received.append(message)
172 print(f" [{self._name}] received: {message}")
173
174
175 if __name__ == "__main__":
176 broker = MessageBroker(strategy=SyncDeliveryStrategy())
177
178 # Create topics
179 broker.create_topic("order-events")
180 broker.create_topic("payment-events")
181 broker.create_topic("user-events")
182
183 # Create subscribers
184 email_service = LoggingSubscriber("EmailService")
185 analytics = LoggingSubscriber("Analytics")
186 audit_log = LoggingSubscriber("AuditLog")
187 fraud_detector = LoggingSubscriber("FraudDetector")
188
189 # Wire up subscriptions
190 broker.subscribe("order-events", email_service)
191 broker.subscribe("order-events", analytics)
192 broker.subscribe("order-events", audit_log)
193 broker.subscribe("payment-events", fraud_detector)
194 broker.subscribe("payment-events", audit_log)
195 broker.subscribe("user-events", analytics)
196
197 # Publish messages
198 print("Publishing to order-events:")
199 broker.publish("order-events", "Order #1001 created")
200
201 print("\nPublishing to payment-events:")
202 broker.publish("payment-events", "Payment $99.99 processed for Order #1001")
203
204 print("\nPublishing to user-events:")
205 broker.publish("user-events", "User john@example.com signed up")
206
207 # Unsubscribe and publish again
208 broker.unsubscribe("order-events", analytics)
209 print("\nAfter unsubscribing Analytics from order-events:")
210 broker.publish("order-events", "Order #1002 shipped")
211
212 # Verify delivery counts
213 print(f"\nEmail service received {len(email_service.received)} messages")
214 print(f"Analytics received {len(analytics.received)} messages")
215 print(f"Audit log received {len(audit_log.received)} messages")
216 print(f"Fraud detector received {len(fraud_detector.received)} messages")
217
218 assert len(email_service.received) == 2
219 assert len(analytics.received) == 2 # 1 order + 1 user (unsubscribed before 2nd order)
220 assert len(audit_log.received) == 3 # 2 orders + 1 payment
221 assert len(fraud_detector.received) == 1
222 print("All assertions passed.")Common Mistakes
- ✗Making subscribers concrete classes instead of interfaces. Every new integration requires modifying the broker.
- ✗No message IDs. Retries and redeliveries silently duplicate work without deduplication support.
- ✗Synchronous-only delivery. One slow subscriber blocks all others on the same topic.
- ✗Forgetting thread safety on the subscriber list. Concurrent subscribe and publish causes ConcurrentModificationException.
Key Points
- ✓Observer pattern is the backbone. Topics maintain subscriber lists and notify on publish.
- ✓Strategy pattern controls delivery semantics: synchronous vs asynchronous, at-most-once vs at-least-once.
- ✓Message IDs enable deduplication. Without them, network retries cause duplicate processing.
- ✓Topic-based routing chosen over content-based for predictability. Subscribers know exactly what they signed up for.