Job Scheduler
Cron-like scheduler with retries, priorities, and cancellation. A min-heap keyed by next-run time plus a worker pool handles millions of jobs per node.
Key Abstractions
Value object: id, command, schedule, retry policy, status
Strategy returning the next fire time — cron, fixed-rate, fixed-delay, one-shot
Min-heap keyed by next-run time. Worker threads block on the head via a condition variable.
Facade. Register, cancel, and query jobs. Owns the queue, worker threads, and a bounded task executor for per-job timeouts.
Class Diagram
The Key Insight
The naive design is "one thread per job, each with its own sleep(delay) loop." At a thousand jobs that's a thousand idle threads eating kernel memory. At a million, the OS refuses to allocate more.
The working design is a single min-heap keyed by next-fire-time and a small fixed pool of workers that poll the heap. The heap's O(1) peek tells the worker exactly how long to sleep before the next job is due — condition variables let the worker wake early when a sooner job arrives. One heap, a handful of threads, millions of scheduled jobs.
Schedule as a Strategy is the second lever. Cron jobs, fixed-rate jobs, one-shot timers, and exponential-backoff retries all reduce to "when's the next fire time?" A single method on a Schedule interface absorbs all of them. The queue doesn't know or care which variant it's holding.
Requirements
Functional
- Schedule one-shot, fixed-delay, fixed-rate, and cron jobs
- Cancel a scheduled job by ID
- Retry failed jobs with configurable policy (max attempts, backoff)
- Recurring jobs continue after success or exhaustion of retries
- Query job status
Non-Functional
- Scale to 100k+ scheduled jobs on one node
- Worker threads bounded — not one-per-job
- O(log n) insert, O(log n) pop
- Thread-safe cancellation without racing in-flight executions
Design Decisions
Why a min-heap (priority queue) over a sorted list?
Insert into a sorted list is O(n). A priority queue gives O(log n) insert and O(1) peek. Removal of arbitrary elements (cancellation) isn't O(log n) in a standard heap — that's why cancellation here sets a flag and the queue prunes cancelled heads on its next poll. Lazy deletion keeps the common path fast.
Why condition variables instead of polling?
A busy-poll loop burns CPU for no reason. await(deadline) lets the worker sleep precisely until the next job is due, waking early if a sooner job arrives. Zero CPU when the queue is quiet.
Why retry on the Job, not the Scheduler?
Different jobs want different policies. A payment retry needs 10 attempts with long backoff; a metrics emit needs 1 attempt and give up. Storing the policy on the Job keeps the worker code generic.
Why separate Worker from Scheduler?
Scheduler is the public API. Worker is the internal runner. Separating them means a future "distributed scheduler" could swap in a worker that pulls jobs from Kafka while the scheduler-side stays identical.
Why not use ScheduledExecutorService directly?
It's close, but it doesn't give control over retry policy, cancellation with in-flight protection, or swappable schedules. In an interview, demonstrating that the underlying primitives are well understood beats "I'd just use the library."
Interview Follow-ups
- "How would you make this distributed?" Partition jobs by hash of ID across N scheduler nodes. Durable store (Postgres, Kafka) for persistence. Leader-election per partition to prevent double-fire.
- "What about exactly-once execution?" Hard. Practical answer: at-least-once with idempotent job code. Or write a "claim" to the database before executing and a "completed" after — if the node crashes mid-job, another node reclaims after a timeout.
- "How do cron expressions work?" Parse
* * * * *into five fields (minute, hour, dom, month, dow).nextFireTimeiterates fromafter, bumping fields to the next allowed value. Off-the-shelf libraries (Quartz, croniter) handle the edge cases like DST. - "How do you handle misfires?" If the scheduler was down when a job was supposed to fire, on recovery either run it immediately (fire-now policy) or skip and fire next normally (discard policy). Configurable per job.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from dataclasses import dataclass, field
4 from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
5 from datetime import datetime, timedelta
6 from enum import Enum
7 from threading import Condition, Thread, RLock
8 from typing import Callable
9 import heapq
10 import itertools
11 import logging
12 import uuid
13
14 log = logging.getLogger("scheduler")
15
16
17 class JobStatus(Enum):
18 SCHEDULED = "scheduled"
19 RUNNING = "running"
20 DONE = "done"
21 FAILED = "failed"
22 TIMED_OUT = "timed_out"
23 CANCELLED = "cancelled"
24
25
26 class Schedule(ABC):
27 @abstractmethod
28 def next_fire_time(self, after: datetime) -> datetime | None: ...
29
30 @property
31 def is_one_shot(self) -> bool:
32 return False
33
34
35 class FixedDelaySchedule(Schedule):
36 """Fire every `delay` after the previous run (not the previous scheduled time)."""
37
38 def __init__(self, delay: timedelta):
39 if delay <= timedelta(0):
40 raise ValueError("delay must be positive")
41 self._delay = delay
42
43 def next_fire_time(self, after: datetime) -> datetime:
44 return after + self._delay
45
46
47 class OneShotSchedule(Schedule):
48 def __init__(self, run_at: datetime):
49 self._run_at = run_at
50 self._fired = False
51
52 def next_fire_time(self, after: datetime) -> datetime | None:
53 if self._fired:
54 return None
55 self._fired = True
56 return self._run_at
57
58 @property
59 def is_one_shot(self) -> bool:
60 return True
61
62
63 @dataclass
64 class RetryPolicy:
65 max_attempts: int = 3
66 base_backoff: timedelta = timedelta(seconds=1)
67 multiplier: float = 2.0
68
69 def backoff(self, attempt: int) -> timedelta:
70 # attempt is 1-indexed; attempt 1 fails -> wait base * multiplier^0.
71 seconds = self.base_backoff.total_seconds() * (self.multiplier ** (attempt - 1))
72 return timedelta(seconds=min(seconds, 300)) # cap at 5 minutes
73
74
75 _id_counter = itertools.count()
76
77
78 @dataclass
79 class Job:
80 id: str
81 task: Callable[[], None]
82 schedule: Schedule
83 retry: RetryPolicy
84 next_run_at: datetime
85 # None = no timeout. A stuck job with no timeout holds its worker forever.
86 max_runtime: timedelta | None = None
87 status: JobStatus = JobStatus.SCHEDULED
88 attempts: int = 0
89 cancelled: bool = False
90 # Heap tie-breaker — ensures insertion order for equal timestamps.
91 seq: int = field(default_factory=lambda: next(_id_counter))
92
93 def __lt__(self, other: "Job") -> bool:
94 return (self.next_run_at, self.seq) < (other.next_run_at, other.seq)
95
96
97 class JobQueue:
98 """Min-heap by next_run_at. Workers take from the head when due."""
99
100 def __init__(self):
101 self._heap: list[Job] = []
102 self._index: dict[str, Job] = {}
103 self._cv = Condition()
104
105 def offer(self, job: Job) -> None:
106 with self._cv:
107 heapq.heappush(self._heap, job)
108 self._index[job.id] = job
109 self._cv.notify()
110
111 def take_due(self) -> Job | None:
112 """Block until a job is due. Returns None if interrupted."""
113 with self._cv:
114 while True:
115 # Prune cancelled jobs from the head. They shouldn't fire.
116 while self._heap and self._heap[0].cancelled:
117 victim = heapq.heappop(self._heap)
118 self._index.pop(victim.id, None)
119
120 if not self._heap:
121 self._cv.wait(timeout=1.0)
122 if not self._heap:
123 return None
124 continue
125
126 head = self._heap[0]
127 now = datetime.utcnow()
128 if head.next_run_at <= now:
129 self._index.pop(head.id, None)
130 return heapq.heappop(self._heap)
131 wait_seconds = (head.next_run_at - now).total_seconds()
132 self._cv.wait(timeout=max(wait_seconds, 0))
133
134 def cancel(self, job_id: str) -> bool:
135 with self._cv:
136 job = self._index.get(job_id)
137 if job is None or job.status != JobStatus.SCHEDULED:
138 return False
139 job.cancelled = True
140 job.status = JobStatus.CANCELLED
141 self._cv.notify_all()
142 return True
143
144 def wake(self) -> None:
145 with self._cv:
146 self._cv.notify_all()
147
148
149 class Scheduler:
150 def __init__(self, worker_count: int = 2):
151 if worker_count <= 0:
152 raise ValueError("need at least one worker")
153 self._queue = JobQueue()
154 self._workers: list[Thread] = []
155 self._worker_count = worker_count
156 self._running = False
157 self._lock = RLock()
158 # Pool for bounded task execution — lets us enforce per-job timeouts.
159 # Python can't forcibly stop a thread, so a timed-out task's thread
160 # keeps running until it notices; budget for this in pool size.
161 self._task_pool = ThreadPoolExecutor(max_workers=worker_count * 4)
162
163 def schedule(self, task: Callable[[], None], schedule: Schedule,
164 retry: RetryPolicy | None = None,
165 max_runtime: timedelta | None = None) -> str:
166 now = datetime.utcnow()
167 first_fire = schedule.next_fire_time(now)
168 if first_fire is None:
169 raise ValueError("Schedule produced no fire time")
170 job = Job(
171 id=str(uuid.uuid4())[:8],
172 task=task,
173 schedule=schedule,
174 retry=retry or RetryPolicy(),
175 next_run_at=first_fire,
176 max_runtime=max_runtime,
177 )
178 self._queue.offer(job)
179 return job.id
180
181 def cancel(self, job_id: str) -> bool:
182 return self._queue.cancel(job_id)
183
184 def start(self) -> None:
185 with self._lock:
186 if self._running:
187 return
188 self._running = True
189 for i in range(self._worker_count):
190 t = Thread(target=self._worker_loop, name=f"worker-{i}", daemon=True)
191 t.start()
192 self._workers.append(t)
193
194 def stop(self) -> None:
195 with self._lock:
196 self._running = False
197 self._queue.wake()
198 self._task_pool.shutdown(wait=False)
199
200 def _worker_loop(self) -> None:
201 while self._running:
202 job = self._queue.take_due()
203 if job is None or job.cancelled:
204 continue
205 self._execute(job)
206
207 def _execute(self, job: Job) -> None:
208 job.status = JobStatus.RUNNING
209 job.attempts += 1
210 timeout_secs = job.max_runtime.total_seconds() if job.max_runtime else None
211 future = self._task_pool.submit(job.task)
212 try:
213 future.result(timeout=timeout_secs)
214 job.status = JobStatus.DONE
215 self._maybe_reschedule(job)
216 except FutureTimeoutError:
217 log.warning("Job %s timed out after %s", job.id, job.max_runtime)
218 # Python can't forcibly stop a thread. The task keeps running; we stop caring.
219 self._handle_failure(job, "timeout", timed_out=True)
220 except Exception as e:
221 log.warning("Job %s failed (attempt %d): %s", job.id, job.attempts, e)
222 self._handle_failure(job, "error", timed_out=False)
223
224 def _handle_failure(self, job: Job, reason: str, *, timed_out: bool) -> None:
225 if job.attempts < job.retry.max_attempts:
226 job.next_run_at = datetime.utcnow() + job.retry.backoff(job.attempts)
227 job.status = JobStatus.SCHEDULED
228 self._queue.offer(job)
229 else:
230 job.status = JobStatus.TIMED_OUT if timed_out else JobStatus.FAILED
231
232 def _maybe_reschedule(self, job: Job) -> None:
233 if job.schedule.is_one_shot:
234 return
235 nxt = job.schedule.next_fire_time(datetime.utcnow())
236 if nxt is None:
237 return
238 # Reset attempts for the next fire — retry counter is per-invocation.
239 job.attempts = 0
240 job.next_run_at = nxt
241 job.status = JobStatus.SCHEDULED
242 self._queue.offer(job)
243
244
245 if __name__ == "__main__":
246 import time, threading
247
248 sched = Scheduler(worker_count=2)
249 sched.start()
250
251 # One-shot in 100 ms.
252 counter = {"n": 0}
253 def once(): counter["n"] += 1; print("one-shot fired")
254 sched.schedule(once, OneShotSchedule(datetime.utcnow() + timedelta(milliseconds=100)))
255
256 # Recurring every 200 ms.
257 def tick(): print(f"tick at {datetime.utcnow().isoformat()}")
258 tick_id = sched.schedule(tick, FixedDelaySchedule(timedelta(milliseconds=200)))
259
260 # Failing job — retries with backoff.
261 fails = {"n": 0}
262 def flaky():
263 fails["n"] += 1
264 if fails["n"] < 3:
265 raise RuntimeError("transient")
266 print(f"flaky succeeded on attempt {fails['n']}")
267 sched.schedule(flaky, OneShotSchedule(datetime.utcnow() + timedelta(milliseconds=50)),
268 retry=RetryPolicy(max_attempts=5, base_backoff=timedelta(milliseconds=50)))
269
270 # Timed-out job: max_runtime=100ms but the task sleeps 500ms.
271 stuck_attempts = {"n": 0}
272 def stuck():
273 stuck_attempts["n"] += 1
274 time.sleep(0.5)
275 stuck_id = sched.schedule(
276 stuck,
277 OneShotSchedule(datetime.utcnow() + timedelta(milliseconds=30)),
278 retry=RetryPolicy(max_attempts=2, base_backoff=timedelta(milliseconds=20)),
279 max_runtime=timedelta(milliseconds=100),
280 )
281
282 time.sleep(2.0)
283 sched.cancel(tick_id)
284 time.sleep(0.3)
285 sched.stop()
286 print(f"one-shot fired {counter['n']} time(s), flaky took {fails['n']} attempts")
287 # Stuck job: 2 attempts each timed out; final status = TIMED_OUT.
288 print(f"stuck attempts: {stuck_attempts['n']}")
289 assert stuck_attempts["n"] >= 2Common Mistakes
- ✗Busy-waiting on 'is the next job ready yet'. Use condition variables and sleep-until.
- ✗Timer-per-job. A million jobs = a million threads. Heap + single dispatcher scales.
- ✗Retrying forever. Max-attempts plus exponential backoff is the bare minimum.
- ✗Losing jobs on crash. Production needs a durable store; in-memory is test-only.
Key Points
- ✓Min-heap keyed by next-run timestamp. O(log n) insert, O(1) peek at soonest job.
- ✓Schedule is a Strategy so cron, rate-based, and delay-based jobs share one queue.
- ✓Retry policy on the Job itself (max attempts, backoff). Worker handles the mechanics.
- ✓Cancellation sets a flag; the worker checks before executing. Avoids racing in-flight jobs.