Notification System
Multi-channel notification delivery with user preferences and templated messages. Strategy pattern selects the channel, Template Method standardizes formatting, and Observer triggers notifications from domain events.
Key Abstractions
Orchestrator that resolves user preferences, selects channels, renders templates, and dispatches delivery
Strategy interface for delivery. Email, SMS, and Push each implement send() differently.
Message template with named placeholders. Renders final content by substituting context values.
Per-user channel preferences. Controls which channels a user receives notifications on.
Async delivery buffer with retry logic. Decouples notification creation from actual sending.
Class Diagram
The Key Insight
A notification system is not about sending messages. It is about routing the right message through the right channel to the right user at the right time. The core problem is multi-dimensional: you have N notification types, M delivery channels, and every user has their own preferences about which channels they actually want. If you hard-wire any of these dimensions together, you end up with combinatorial explosion in your codebase.
Strategy pattern solves the channel dimension. Each channel (email, SMS, push) implements the same interface but handles delivery through completely different transports. Template Method handles the content dimension. Every notification follows the same structure: resolve placeholders, format for the channel, send. The template defines what varies (the content), while the method defines the fixed flow (render, then deliver). User preferences act as a filter layer between the two, ensuring you only route to channels the user has opted into.
Requirements
Functional
- Support multiple delivery channels: email, SMS, and push notifications
- Template-based message formatting with named placeholders
- Per-user channel preferences that control which channels are active
- Async delivery queue that does not block the caller
- Retry failed deliveries up to a configurable maximum
Non-Functional
- Adding a new channel requires one new class, no modifications to existing code
- Adding a new notification type requires one new template, no code changes
- Channel failures are isolated. A failing SMS gateway must not prevent email delivery.
- Delivery order within a single notification is not guaranteed across channels
Design Decisions
Why Strategy for channels instead of if-else branching?
The naive approach is a switch statement: if email, call SMTP; if SMS, call Twilio; if push, call Firebase. This works for two channels. By the fourth channel, you have a method nobody wants to modify because every change risks breaking the others. Strategy pattern turns each channel into an isolated unit. The orchestrator iterates over channels without knowing what any of them do internally. When product asks for WhatsApp support, you write one class and register it. The orchestrator never changes.
Why templates instead of string formatting in each caller?
Without templates, every place that triggers a notification builds its own message string. You end up with slightly different wording for the same event scattered across the codebase. Templates centralize content. Change the order confirmation wording once and it updates everywhere. The placeholder syntax ({{user_name}}, {{order_id}}) keeps templates readable by non-engineers too, which matters when product wants to tweak copy.
Why a queue instead of direct sending?
Notification delivery involves network calls to external services. SMTP servers timeout. Push gateways rate-limit. If you send synchronously in the request path, a slow email server adds seconds to your API response time. The queue decouples creation from delivery. The caller fires and forgets. A background processor handles the actual sending, retries, and failure tracking.
Why per-user preferences as a first-class concept?
It is tempting to treat preferences as a filter you bolt on later. But preferences affect the core routing decision: which channels get a task. Making UserPreference a first-class object means the service always checks preferences before queuing. There is no code path that accidentally bypasses the check.
Interview Follow-ups
- "How would you add priority levels?" Introduce a priority field on the notification. The queue becomes a priority queue. High-priority notifications (security alerts, password resets) jump ahead of marketing emails.
- "How do you handle rate limiting per channel?" Add a rate limiter per channel type in the queue processor. If the SMS gateway allows 10 messages per second, the queue throttles SMS tasks accordingly while letting email and push proceed at full speed.
- "What about notification grouping or batching?" Add a batching layer before the queue. Collect notifications for the same user within a time window, then merge them into a digest. This prevents the "50 emails in one hour" problem.
- "How would you track delivery status?" Add a status field to NotificationTask (PENDING, DELIVERED, FAILED). Store completed tasks in a database. Expose an API for users to see their notification history and delivery status.
Code Implementation
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from collections import deque
import re
@dataclass
class RenderedMessage:
"""Final message after template rendering."""
subject: str
body: str
class Template:
"""Message template with {{placeholder}} syntax.
Keeps notification content separate from delivery logic."""
def __init__(self, name: str, subject_template: str, body_template: str):
self.name = name
self._subject_template = subject_template
self._body_template = body_template
def render(self, context: dict[str, str]) -> RenderedMessage:
subject = self._substitute(self._subject_template, context)
body = self._substitute(self._body_template, context)
return RenderedMessage(subject=subject, body=body)
@staticmethod
def _substitute(template: str, context: dict[str, str]) -> str:
def replacer(match):
key = match.group(1).strip()
return context.get(key, match.group(0))
return re.sub(r"\{\{(.+?)\}\}", replacer, template)
class NotificationChannel(ABC):
"""Strategy interface. Each channel knows how to deliver a message
through its specific transport (SMTP, SMS gateway, push service)."""
@abstractmethod
def get_type(self) -> str: ...
@abstractmethod
def send(self, recipient: str, subject: str, body: str) -> bool: ...
class EmailChannel(NotificationChannel):
"""Simulates email delivery via SMTP."""
def get_type(self) -> str:
return "email"
def send(self, recipient: str, subject: str, body: str) -> bool:
print(f" [EMAIL] To: {recipient} | Subject: {subject} | Body: {body}")
return True
class SMSChannel(NotificationChannel):
"""Simulates SMS delivery. Subject gets ignored since SMS has no subject line."""
def get_type(self) -> str:
return "sms"
def send(self, recipient: str, subject: str, body: str) -> bool:
print(f" [SMS] To: {recipient} | Body: {body}")
return True
class PushChannel(NotificationChannel):
"""Simulates push notification delivery."""
def get_type(self) -> str:
return "push"
def send(self, recipient: str, subject: str, body: str) -> bool:
print(f" [PUSH] To: {recipient} | Title: {subject} | Body: {body}")
return True
@dataclass
class UserPreference:
"""Stores which channels a user has opted into.
Defaults to email only if nothing is set."""
user_id: str
enabled_channels: set[str] = field(default_factory=lambda: {"email"})
def is_enabled(self, channel_type: str) -> bool:
return channel_type in self.enabled_channels
@dataclass
class NotificationTask:
"""A single delivery attempt. Tracks retries so we can
re-queue on transient failures instead of dropping the message."""
channel: NotificationChannel
recipient: str
subject: str
body: str
retry_count: int = 0
class NotificationQueue:
"""Async delivery buffer. In production this would sit on top of
a real message queue. Here we use a simple deque with retry logic."""
def __init__(self, max_retries: int = 3):
self._pending: deque[NotificationTask] = deque()
self._max_retries = max_retries
self._delivered: list[NotificationTask] = []
self._failed: list[NotificationTask] = []
def enqueue(self, task: NotificationTask) -> None:
self._pending.append(task)
def process_all(self) -> None:
retry_queue: list[NotificationTask] = []
while self._pending:
task = self._pending.popleft()
success = task.channel.send(task.recipient, task.subject, task.body)
if success:
self._delivered.append(task)
elif task.retry_count < self._max_retries:
task.retry_count += 1
retry_queue.append(task)
else:
self._failed.append(task)
print(f" [FAILED] Giving up on {task.channel.get_type()} "
f"to {task.recipient} after {self._max_retries} retries")
for task in retry_queue:
self._pending.append(task)
@property
def delivered_count(self) -> int:
return len(self._delivered)
@property
def failed_count(self) -> int:
return len(self._failed)
class NotificationService:
"""Orchestrator. Resolves templates, checks user preferences,
and queues delivery tasks. Never sends directly."""
def __init__(self):
self._channels: dict[str, NotificationChannel] = {}
self._templates: dict[str, Template] = {}
self._preferences: dict[str, UserPreference] = {}
self._queue = NotificationQueue(max_retries=3)
def register_channel(self, channel: NotificationChannel) -> None:
self._channels[channel.get_type()] = channel
def register_template(self, template: Template) -> None:
self._templates[template.name] = template
def set_preference(self, pref: UserPreference) -> None:
self._preferences[pref.user_id] = pref
def send(self, user_id: str, template_name: str, context: dict[str, str]) -> int:
"""Send notification to user across all their enabled channels.
Returns the number of tasks queued."""
template = self._templates.get(template_name)
if template is None:
raise ValueError(f"Template '{template_name}' not found")
pref = self._preferences.get(user_id, UserPreference(user_id=user_id))
rendered = template.render(context)
queued = 0
for channel_type, channel in self._channels.items():
if pref.is_enabled(channel_type):
task = NotificationTask(
channel=channel,
recipient=user_id,
subject=rendered.subject,
body=rendered.body,
)
self._queue.enqueue(task)
queued += 1
return queued
def flush(self) -> None:
"""Process all pending notifications."""
self._queue.process_all()
@property
def delivered_count(self) -> int:
return self._queue.delivered_count
if __name__ == "__main__":
service = NotificationService()
# Register channels
service.register_channel(EmailChannel())
service.register_channel(SMSChannel())
service.register_channel(PushChannel())
# Register templates
service.register_template(Template(
name="order_confirmation",
subject_template="Order {{order_id}} Confirmed",
body_template="Hi {{user_name}}, your order {{order_id}} for {{amount}} has been confirmed.",
))
service.register_template(Template(
name="password_reset",
subject_template="Password Reset Request",
body_template="Hi {{user_name}}, click here to reset your password: {{reset_link}}",
))
service.register_template(Template(
name="welcome",
subject_template="Welcome to the platform!",
body_template="Hey {{user_name}}, glad to have you. Get started at {{getting_started_link}}.",
))
# Set user preferences
service.set_preference(UserPreference("alice", {"email", "push"}))
service.set_preference(UserPreference("bob", {"email", "sms", "push"}))
service.set_preference(UserPreference("charlie", {"sms"}))
# Send order confirmation to Alice (email + push)
print("Sending order confirmation to Alice:")
queued = service.send("alice", "order_confirmation", {
"user_name": "Alice",
"order_id": "ORD-1001",
"amount": "$79.99",
})
print(f" Queued {queued} tasks")
service.flush()
# Send password reset to Bob (email + sms + push)
print("\nSending password reset to Bob:")
queued = service.send("bob", "password_reset", {
"user_name": "Bob",
"reset_link": "https://example.com/reset/abc123",
})
print(f" Queued {queued} tasks")
service.flush()
# Send welcome to Charlie (sms only)
print("\nSending welcome to Charlie:")
queued = service.send("charlie", "welcome", {
"user_name": "Charlie",
"getting_started_link": "https://example.com/start",
})
print(f" Queued {queued} tasks")
service.flush()
# Summary
print(f"\nTotal delivered: {service.delivered_count}")
assert service.delivered_count == 6 # Alice:2 + Bob:3 + Charlie:1
print("All assertions passed.")Common Mistakes
- ✗Hardcoding channel logic in the service. Every new channel turns into another if-else branch that nobody wants to touch.
- ✗Skipping user preferences. Sending push notifications to users who disabled them is a fast path to uninstalls.
- ✗Synchronous delivery in the request path. An SMTP timeout should not cause a 500 on your checkout endpoint.
- ✗No retry mechanism. Transient failures in SMS gateways or push services are normal. Dropping notifications silently is not.
Key Points
- ✓Strategy pattern makes channels pluggable. Adding a new channel means one new class, zero changes to the orchestrator.
- ✓Template Method standardizes message formatting. Every channel follows the same render-then-send flow, but each customizes the rendering step.
- ✓User preferences are first-class. Never blast every channel. Respect what the user opted into.
- ✓Async queue with retry handles transient failures. SMTP timeouts and push gateway errors are inevitable.