Printer Spooler
Multi-printer queue with priorities and paper/toner constraints. A priority queue per printer plus a job router handles the routing; cancellation and retries handle the rest.
Key Abstractions
What to print — pages, copies, duplex, priority, submitting user
One physical device. Owns its queue, paper/toner state, and current job.
Priority queue of jobs for a printer
Strategy picking which printer a new job goes to — round-robin, least-loaded, nearest
Facade. Submit, cancel, status. Manages printer pool and router.
Class Diagram
The Key Insight
The classic trap is one global queue feeding all printers. It sounds elegant but it's a coordination disaster: every printer contends on the same queue, a stalled printer holds jobs it can't finish, and fairness gets fuzzy fast. The real-world design — one queue per printer plus a router that picks on submit — decouples the printers completely. If printer 1 jams, printer 2 keeps going.
Priority isn't a feature to bolt on; it's in the data model. An enum gives compile-time safety (no priority = 47 weirdness) and a natural comparator. FIFO within a priority level keeps things fair — two HIGH jobs submitted in order print in order.
The consumable model (paper, toner) matters more than it looks. A printer with 3 sheets of paper and a 50-page job should reject the submission, not start printing and fail at page 3. Surfacing is_available gives the router a clean signal to route elsewhere, and an ops dashboard a clean signal to dispatch a refill tech.
Requirements
Functional
- Submit jobs with document, copies, priority
- Route to one of several printers based on policy
- Cancel queued jobs (but not the currently printing one)
- Track paper and toner; reject new jobs on low supplies
- Support priority ordering and fair FIFO within a priority
Non-Functional
- Printers process in parallel (one queue each)
- Cancellation is O(log n) — flag-and-skip
- Router strategy swappable (least-loaded, round-robin, nearest)
- Status queryable by job ID
Design Decisions
Why per-printer queues?
One global queue creates a chokepoint. Per-printer queues let devices run independently — one printer's problem stays local. The router absorbs the "pick a printer" decision at submit time, which is the right moment to make it (we know current loads, user's office location, etc.).
Why Priority as an enum?
An integer priority tempts everyone to invent levels — 7, 9, 42 — and debugging why "priority 42 came before priority 40" wastes hours. Three enum values (HIGH, NORMAL, LOW) match how humans actually think about print urgency and stay readable in logs.
Why flag-and-skip for cancellation?
A priority queue doesn't support arbitrary-key removal efficiently. Marking a job as cancelled and dropping it on poll is O(1) to mark, O(1) amortized to skip. Cleaner than rebuilding the heap.
Why can't we cancel the currently-printing job?
The page is literally halfway through the rollers. "Cancelling" would mean ejecting a half-printed page — achievable but not worth the hardware complexity. Real spoolers model this: cancellation only affects jobs still in the queue.
Why separate JobRouter from SpoolService?
Router is a policy. Service is the machinery. A building with one printer per floor wants nearest-floor routing. A print shop wants least-loaded for throughput. Same SpoolService, different router — no forks or feature flags required.
Interview Follow-ups
- "How would you support printer groups (color printers only, A3-capable only)?" Add
capabilities: Set<Capability>to Printer andrequires: Set<Capability>to PrintJob. Router filters by capability match first. - "What about user quotas (max 500 pages/month)?" Observer on job completion updates a per-user counter. Submit consults the counter before routing; quota exhaustion returns a clear error.
- "How do you handle a jammed printer?" Printer state machine: READY → BUSY → JAMMED. Router skips JAMMED printers. Jam-clear event moves back to READY. Queued jobs stay in place; admins can rebalance.
- "How would secure printing work?" Job marked
requiresPin. Printer holds it in queue but doesn't auto-process. User enters PIN at the device; matched jobs release for that printer only. - "What if we add a print server cluster?" Each server node owns a subset of printers (sharding by printer ID). The SpoolService API sits behind a load balancer. Status lookups route via consistent hashing on job ID.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from dataclasses import dataclass, field
4 from datetime import datetime
5 from enum import Enum
6 from threading import RLock
7 from typing import Callable
8 import heapq
9 import itertools
10 import uuid
11
12
13 class Priority(Enum):
14 HIGH = 1
15 NORMAL = 2
16 LOW = 3
17
18
19 class JobStatus(Enum):
20 QUEUED = "queued"
21 PRINTING = "printing"
22 DONE = "done"
23 FAILED = "failed"
24 CANCELLED = "cancelled"
25
26
27 @dataclass
28 class Document:
29 name: str
30 page_count: int
31
32
33 _seq = itertools.count()
34
35
36 @dataclass
37 class PrintJob:
38 id: str
39 user_id: str
40 document: Document
41 copies: int = 1
42 duplex: bool = False
43 priority: Priority = Priority.NORMAL
44 status: JobStatus = JobStatus.QUEUED
45 created_at: datetime = field(default_factory=datetime.utcnow)
46 seq: int = field(default_factory=lambda: next(_seq))
47
48 def total_pages(self) -> int:
49 return self.copies * self.document.page_count
50
51 def __lt__(self, other: "PrintJob") -> bool:
52 # Lower priority enum value wins (HIGH=1 before NORMAL=2). FIFO tie-break.
53 return (self.priority.value, self.seq) < (other.priority.value, other.seq)
54
55
56 class PrintQueue:
57 def __init__(self):
58 self._heap: list[PrintJob] = []
59 self._index: dict[str, PrintJob] = {}
60 self._lock = RLock()
61
62 def offer(self, job: PrintJob) -> None:
63 with self._lock:
64 heapq.heappush(self._heap, job)
65 self._index[job.id] = job
66
67 def poll(self) -> PrintJob | None:
68 with self._lock:
69 while self._heap:
70 candidate = heapq.heappop(self._heap)
71 # Drop cancelled jobs silently.
72 if candidate.status == JobStatus.CANCELLED:
73 self._index.pop(candidate.id, None)
74 continue
75 self._index.pop(candidate.id, None)
76 return candidate
77 return None
78
79 def size(self) -> int:
80 with self._lock:
81 return sum(1 for j in self._heap if j.status == JobStatus.QUEUED)
82
83 def cancel(self, job_id: str) -> bool:
84 with self._lock:
85 job = self._index.get(job_id)
86 if job is None or job.status != JobStatus.QUEUED:
87 return False
88 job.status = JobStatus.CANCELLED
89 return True
90
91
92 class Printer:
93 def __init__(self, printer_id: str, model: str, paper: int = 500, toner: int = 10_000):
94 self.id = printer_id
95 self.model = model
96 self._queue = PrintQueue()
97 self._paper = paper # sheets available
98 self._toner = toner # arbitrary units
99 self._current: PrintJob | None = None
100 self._lock = RLock()
101
102 @property
103 def queue_size(self) -> int:
104 return self._queue.size()
105
106 @property
107 def is_available(self) -> bool:
108 # Available means "accepting jobs" — low supplies block new submissions.
109 with self._lock:
110 return self._paper > 10 and self._toner > 100
111
112 def submit(self, job: PrintJob) -> None:
113 if not self.is_available:
114 raise RuntimeError(f"Printer {self.id} not accepting jobs — low supplies")
115 self._queue.offer(job)
116
117 def process_next(self) -> PrintJob | None:
118 job = self._queue.poll()
119 if job is None:
120 return None
121 with self._lock:
122 self._current = job
123 job.status = JobStatus.PRINTING
124 pages_needed = job.total_pages()
125 if self._paper < pages_needed:
126 job.status = JobStatus.FAILED
127 self._current = None
128 return job
129 # Simulate consumption.
130 self._paper -= pages_needed
131 self._toner -= pages_needed # 1 toner unit per page — rough
132 job.status = JobStatus.DONE
133 self._current = None
134 return job
135
136 def refill_paper(self, sheets: int) -> None:
137 with self._lock:
138 self._paper += sheets
139
140 def refill_toner(self, units: int) -> None:
141 with self._lock:
142 self._toner += units
143
144 def cancel_queued(self, job_id: str) -> bool:
145 return self._queue.cancel(job_id)
146
147
148 class JobRouter(ABC):
149 @abstractmethod
150 def pick(self, printers: list[Printer], job: PrintJob) -> Printer | None: ...
151
152
153 class LeastLoadedRouter(JobRouter):
154 def pick(self, printers: list[Printer], job: PrintJob) -> Printer | None:
155 eligible = [p for p in printers if p.is_available]
156 return min(eligible, key=lambda p: p.queue_size) if eligible else None
157
158
159 class RoundRobinRouter(JobRouter):
160 def __init__(self):
161 self._cursor = 0
162 self._lock = RLock()
163
164 def pick(self, printers: list[Printer], job: PrintJob) -> Printer | None:
165 eligible = [p for p in printers if p.is_available]
166 if not eligible:
167 return None
168 with self._lock:
169 idx = self._cursor % len(eligible)
170 self._cursor += 1
171 return eligible[idx]
172
173
174 class NotificationBus:
175 """Observer bus. Events: submitted, started, completed, failed, cancelled."""
176
177 def __init__(self):
178 self._subs: list[Callable[[str, PrintJob], None]] = []
179
180 def subscribe(self, handler: Callable[[str, PrintJob], None]) -> None:
181 self._subs.append(handler)
182
183 def publish(self, event: str, job: PrintJob) -> None:
184 for h in self._subs:
185 h(event, job)
186
187
188 class SpoolService:
189 def __init__(self, printers: list[Printer], router: JobRouter | None = None):
190 if not printers:
191 raise ValueError("need at least one printer")
192 self._printers = printers
193 self._router = router or LeastLoadedRouter()
194 self._jobs: dict[str, PrintJob] = {}
195 self._job_to_printer: dict[str, str] = {}
196 self._bus = NotificationBus()
197
198 def subscribe(self, handler: Callable[[str, PrintJob], None]) -> None:
199 self._bus.subscribe(handler)
200
201 def submit(self, user_id: str, document: Document, copies: int = 1,
202 priority: Priority = Priority.NORMAL, duplex: bool = False) -> str:
203 if copies < 1:
204 raise ValueError("copies must be >= 1")
205 job = PrintJob(
206 id=str(uuid.uuid4())[:8],
207 user_id=user_id,
208 document=document,
209 copies=copies,
210 duplex=duplex,
211 priority=priority,
212 )
213 printer = self._router.pick(self._printers, job)
214 if printer is None:
215 raise RuntimeError("No printer available")
216 printer.submit(job)
217 self._jobs[job.id] = job
218 self._job_to_printer[job.id] = printer.id
219 self._bus.publish("submitted", job)
220 return job.id
221
222 def cancel(self, job_id: str) -> bool:
223 printer_id = self._job_to_printer.get(job_id)
224 if printer_id is None:
225 return False
226 printer = next(p for p in self._printers if p.id == printer_id)
227 cancelled = printer.cancel_queued(job_id)
228 if cancelled:
229 self._bus.publish("cancelled", self._jobs[job_id])
230 return cancelled
231
232 def status(self, job_id: str) -> JobStatus | None:
233 job = self._jobs.get(job_id)
234 return job.status if job else None
235
236 def run_once(self) -> int:
237 """Each printer drains one job. Returns total processed."""
238 processed = 0
239 for printer in self._printers:
240 job = printer.process_next()
241 if job is None:
242 continue
243 processed += 1
244 # printer.process_next already set status to DONE or FAILED.
245 event = "completed" if job.status == JobStatus.DONE else "failed"
246 self._bus.publish(event, job)
247 return processed
248
249
250 if __name__ == "__main__":
251 p1 = Printer("HP-01", "LaserJet", paper=100, toner=500)
252 p2 = Printer("HP-02", "LaserJet", paper=100, toner=500)
253 svc = SpoolService([p1, p2], router=LeastLoadedRouter())
254
255 events: list[tuple[str, str]] = []
256 svc.subscribe(lambda ev, job: events.append((ev, job.id)))
257
258 ids = []
259 for i in range(5):
260 ids.append(svc.submit(f"user-{i}", Document(f"report-{i}.pdf", page_count=10)))
261 high = svc.submit("urgent-user", Document("critical.pdf", page_count=2), priority=Priority.HIGH)
262 ids.append(high)
263
264 print(f"P1 queue: {p1.queue_size}, P2 queue: {p2.queue_size}")
265
266 # Cancel one queued job.
267 svc.cancel(ids[2])
268 print(f"Cancelled {ids[2][:8]}, status = {svc.status(ids[2]).value}")
269
270 # Drain queues. High-priority job goes first on whichever printer holds it.
271 while svc.run_once() > 0:
272 pass
273
274 done = sum(1 for i in ids if svc.status(i) == JobStatus.DONE)
275 cancelled = sum(1 for i in ids if svc.status(i) == JobStatus.CANCELLED)
276 print(f"Completed: {done}, Cancelled: {cancelled}")
277 print(f"Paper left — P1: {p1._paper}, P2: {p2._paper}")
278
279 # Observer summary.
280 counts: dict[str, int] = {}
281 for ev, _ in events:
282 counts[ev] = counts.get(ev, 0) + 1
283 print(f"Events fired: {counts}")
284 assert counts.get("submitted", 0) == len(ids)
285 assert counts.get("completed", 0) == done
286 assert counts.get("cancelled", 0) == cancelledCommon Mistakes
- ✗Single global queue with all printers pulling. Heavy contention and unfair routing under load.
- ✗Cancellation that kills the currently-printing job. The sheet of paper is already halfway through — cancel only affects queued jobs.
- ✗Forgetting to decrement paper count per page. Printer runs empty and users see 'printing' with nothing coming out.
- ✗Priority as a float. Use an enum — HIGH, NORMAL, LOW. Comparable, predictable, debuggable.
Key Points
- ✓Priority queue per printer, not a global queue. Printers work in parallel.
- ✓Job state machine: QUEUED → PRINTING → DONE, or QUEUED → CANCELLED, or PRINTING → FAILED.
- ✓Paper/toner modeled as consumables. Low-supply state stops accepting jobs and surfaces an alert.
- ✓Router is a Strategy — round-robin for fairness, least-loaded for throughput, nearest for office floors.