System Design: Job Scheduler (10M Jobs/day, DAG Dependencies, Effectively-Once Execution)
Goal
A distributed job scheduler.
Scale:
- 10M jobs/day
- 100K concurrent in flight
- Sub-second dispatch, 99.99% availability
Features:
- Cron scheduling
- DAG dependencies
- Priority queues with aging
- Effectively-once execution
TL;DR
At a high level, this system has three moving parts: storing jobs, deciding when they should run, and actually running them.
- Submit. The API writes the job to Postgres (this is where final state lives), computes the next run time, and adds it to a Valkey sorted set keyed by time.
- Schedule (runs every second, leader only). The scheduler looks at Valkey for jobs that are due, checks if their dependencies are satisfied, pushes them to Kafka, and marks them as
QUEUED. - Execute. Workers consume from Kafka, try to grab a lock in Valkey, get a fencing token, run the job, and write results back to Postgres, but only if their token is still current.
A few things worth holding in your head:
- Postgres owns the final state.
- Valkey is just helping figure out what's due and coordinating workers.
- Kafka is how work gets distributed.
- Workers are where correctness actually gets enforced.
On guarantees:
- Duplicate execution is allowed under failure.
- Duplicate commits are blocked by fencing tokens.
- External side effects (payments, emails, etc.) rely on idempotency keys implemented at the target.
That combination is what gives effectively-once behavior in practice.
Pick a path
| Time | Read | What you get |
|---|---|---|
| ~10 min | TL;DR + §1 | End-to-end flow, who does what, where correctness is enforced |
| ~30 min | TL;DR, §1, §7, §11, §13 | Stack tradeoffs, effectively-once execution, DAG resolution (interview-grade) |
| ~60 min | Full post | Every decision plus multi-region, ops, SLOs, appendix |
1. Final Architecture
There are three main flows in the system:
- Submit (write path)
- Schedule + dispatch (leader loop)
- Execute (worker side)
Each part does one thing, and correctness mostly lives at execution time, not at scheduling time.
[1.1] Submit (write path)
Client → API
When a request comes in, the API does a few straightforward things:
- Validate input (payload size,
job_type, auth, tenant quotas, etc.) - Insert a row into Postgres with
status = PENDING. Submission idempotency is enforced here viaUNIQUE (tenant_id, idempotency_key). - Compute the next fire time (immediate / delayed / cron).
- Update the job to
status = SCHEDULEDwith that timestamp. - Add it to Valkey:
ZADD schedules <next_fire_time> <job_id>. - Return
202 Acceptedwith{job_id, next_fire_time}.
One thing to keep in mind: Valkey is not the source of truth. It's just an index of "what should run next".
[1.2] Schedule + dispatch (leader loop)
A single leader runs this loop every second.
Step 1. Pull due jobs from Valkey:
ZRANGEBYSCORE schedules -inf NOW LIMIT 1000
Step 2. For each job_id, load the row from Postgres. The Valkey entry can be stale, so the Postgres status is what matters:
SCHEDULED→ continue.CANCELLED→ remove from Valkey, ignore.QUEUED/RUNNING→ already dispatched by an earlier tick that crashed beforeZREM. Clean up Valkey.- Terminal (
SUCCEEDED,FAILED,DEAD_LETTER) → clean up Valkey.
Step 3. Check DAG readiness. If pending_parents == 0, proceed; otherwise skip for now.
Step 4. Produce to Kafka, picking the topic by priority (critical / high / normal / low). Message is {job_id, attempt, metadata}.
Step 5. Mark the job as queued, guarded against stale transitions:
UPDATE jobs
SET status = 'QUEUED'
WHERE id = $job_id AND status = 'SCHEDULED';Step 6. ZREM schedules job_id.
Two realities to sit with here:
- This flow is not atomic across Kafka, Valkey, and Postgres. A crash in between can leave them inconsistent for a short window. Crash-safety details in §11.7.
- The scheduler is allowed to over-dispatch under failure. That's intentional. Workers + fencing tokens clean it up later.
[1.3] Execute (worker fleet)
Workers only read from Kafka. They never query Valkey for jobs.
For each Kafka message:
- Read the message. Don't commit the offset yet.
- Try to claim the job via a Valkey Lua script:
If the lock fails, someone else owns it. Skip and commit the offset.IF SETNX lock:job:{id} THEN token = INCR fence:job:{id} ELSE skip (another worker holds it) - Record the execution attempt and move the job to
RUNNING(guarded):If thesqlINSERT INTO job_runs (status = 'RUNNING', fencing_token = $token); UPDATE jobs SET status = 'RUNNING' WHERE id = $job_id AND status = 'QUEUED';UPDATEaffects zero rows, the job was already claimed or cancelled. Stop. Thejob_runsrow stays as an abandoned attempt; the next successful claim allocates a higher fencing token and a freshjob_runsrow, and the abandoned one is never committed as succeeded. - Execute the job (HTTP, container, or built-in handler). Pass the idempotency key to the downstream target. In practice this step is where most real-world variability lives; different job types fail in wildly different ways.
- Heartbeat every 10 seconds:
SET hb:{run_id} alive EX 30. - Final write. This is where fencing actually matters: only the latest token wins.
sql
UPDATE job_runs SET status = 'SUCCEEDED' | 'FAILED', result = ... WHERE job_id = $job_id AND fencing_token = $token; - Retry logic:
- If
FAILEDandattempts < max: compute the next retry time,UPDATE jobs SET status = 'SCHEDULED',ZADD schedules. - If
FAILEDand no retries remain:status = 'DEAD_LETTER'.
- If
- Commit the Kafka offset. Only after the success or failure write is persisted.
One subtle but important thing: the job_runs row is inserted before the jobs transition to RUNNING. That way, even if the worker crashes mid-transition, there's still a record that the attempt started.
[1.4] Correctness guarantees
Postgres is where truth lives. Valkey is just helping find what's due; if it disappears, the ZSET rebuilds from Postgres. Kafka is the delivery layer. It moves work, not correctness.
Protection layers:
- Submission idempotency prevents duplicate jobs.
- Status guards prevent stale transitions.
- Fencing tokens prevent stale writes.
- Target-side idempotency prevents duplicate side effects.
The result: effectively-once execution. Exactly-once is not guaranteed. What is guaranteed is that only one result commits, and that side effects don't duplicate when idempotency is implemented correctly at the target.
[1.5] Who reads Valkey, and where uniqueness comes from
Only the scheduler reads the Valkey sorted set. Workers never ask "what should I run next?". They just consume from Kafka.
A common misunderstanding: ZRANGEBYSCORE does not guarantee a single execution. It returns jobs whose fire time has come. Duplicates still happen because:
- Kafka is at-least-once.
- Retries happen.
- Consumer-group rebalances redeliver.
So where is uniqueness enforced? At execution time, by three things working together:
- The lock picks one active executor.
- The fencing token orders racing attempts.
- The conditional
UPDATErejects stale workers.
A rough way to think about the pieces:
- Valkey tells the leader what's due.
- Kafka hands jobs out to workers.
- The lock decides who runs.
- Postgres (via the fencing check) decides whose result counts.
[1.6] Cancellation (cross-cutting)
API path:
UPDATE jobs SET status = 'CANCELLED'.- Best-effort
ZREM schedules job_id.
Worker behavior:
- If the job is still
QUEUED, skip execution on claim. - If the job is
RUNNING, checkcancel:{run_id}between steps, or let the final conditionalUPDATEfail because the status moved toCANCELLED.
Either way, fencing stops a cancelled job from accidentally landing as succeeded.
[1.7] What is a "job"?
A job is just a unit of work. The system doesn't care what it actually does; from its point of view every job is the same row shape: a payload (JSON, ≤256 KB inline; larger blobs go to S3 with a pointer) plus a job_type. That job_type maps, via a central job_type_registry table, to one of three execution modes the worker knows how to run.
| Mode | Worker action | Status signal | Use case |
|---|---|---|---|
| HTTP (default) | POST {target_url} with payload as body and HMAC signature header | 2xx = success, 4xx = no-retry, 5xx / timeout = retry | Cross-service jobs like billing and notifications |
| Container | docker run image:tag with payload as env var JOB_PAYLOAD and a writable /status volume | Exit code 0 = success; optional /status/result.json captures structured output | Custom runtime, ML/ETL with pinned deps |
| Built-in handler | Call a Go/Python function registered in the worker binary at startup | return nil = success; panic = failure | Hot paths >1K/sec where HTTP overhead dominates |
All three modes carry the same payload. The mode only decides delivery: HTTP body, container env var, or function argument. From Postgres's view, every job has the same row shape.
Container status flow. The worker is the parent process of the container. No long-poll, no callback URL. The flow:
- Worker shell-execs
docker run(or invokes the Kubernetes API for pod-per-job mode) and waits on the process. - Exit code is the primary signal. Zero is success, non-zero is failure.
- For richer results, the worker mounts an empty
/statusvolume. The container writes/status/result.jsonbefore exiting; the worker reads it after the process ends. - Stdout/stderr stream to S3 throughout the run.
Built-in handler delivery. Handlers ship in the worker binary and self-register at startup:
func init() {
worker.Register("send_welcome_email", sendWelcomeEmail)
}The registry is an in-memory map[string]HandlerFunc. Adding a handler is a code change plus a worker image rebuild and rolling deploy.
The scheduler never knows which handlers exist. It routes by job_type. The worker either has the handler or returns "unknown job type", which fails the run and pages on-call.
Across all three modes the worker owns the fencing token, the heartbeat, and the conditional Postgres write. The scheduler only talks to workers via Kafka.
2. Problem Statement
A scheduler sounds easy (tasks, times, dependencies) and then breaks in the same three places at scale.
- Effectively-once is hard. Distributed locks expire mid-execution when a worker GCs or stalls on IO. A TTL alone cannot stop two workers from writing conflicting results. Even with perfect state dedup in the database, a downstream HTTP call may already have charged a card before the worker crashed.
- DAG failures cascade. One bad upstream stalls hundreds of downstream dependents. Retry budgets and condition edges must be explicit.
- Priority without starvation. A flood of low-priority analytics cannot block critical billing. The answer is separate queues with aging, not a FIFO with a priority field.
Scale targets.
- 10M jobs/day (~116/sec avg, 500/sec peak)
- 100K concurrent in flight
- 50K DAGs, avg depth 5, max depth 20
- Avg job 30 s, max 2 h
- Submit-to-worker-start latency <1 s p99 (warm pool; see NFR-03 for the full breakdown)
- 99.99% scheduler availability
What not to do.
- In-memory-only state loses every in-flight job on restart.
sleep(next_run - now)drifts, and it scales to one job per thread.- Unbounded retries flood logs and starve the worker pool.
- A single Kafka topic causes priority inversion inside partitions.
- Active-active schedulers without sharded ownership dispatch the same job twice.
- Recursive foreign keys on DAG edges lock up the DB for any 20-node resolution.
- Timeouts alone hide fast crashes. A 2-hour timeout will miss a 1-minute crash for 119 minutes.
- Parsing cron on every tick wastes CPU, and 10M rows amplify the waste.
- Wall-clock scheduling without a timezone breaks twice a year on DST boundaries.
- Infrastructure alone cannot guarantee idempotency. The scheduler has no way of knowing the job charged a card.
3. Functional Requirements
| ID | Requirement | Priority |
|---|---|---|
| FR-01 | Submit a job for immediate execution | P0 |
| FR-02 | Schedule a job for a specific future timestamp | P0 |
| FR-03 | Recurring jobs via cron (5-field + 6-field with seconds, timezone-aware) | P0 |
| FR-04 | DAG dependencies (on_success, on_failure, always) | P0 |
| FR-05 | Effectively-once execution (exactly-once state commit + idempotency-key handoff for side effects) | P0 |
| FR-06 | Four priorities (critical, high, normal, low) with aging | P0 |
| FR-07 | Retries with exponential / fixed / linear backoff | P0 |
| FR-08 | Cancel pending or running jobs | P1 |
| FR-09 | Timeout enforcement | P0 |
| FR-10 | Dead letter queue with inspection and manual replay | P1 |
| FR-11 | Status tracking (PENDING → SCHEDULED → QUEUED → RUNNING → terminal) | P0 |
| FR-12 | DAG-level progress view | P1 |
| FR-13 | Webhooks on completion / failure / timeout | P1 |
| FR-14 | Tags + metadata for filtering | P2 |
| FR-15 | Per-tenant rate limits | P1 |
4. Non-Functional Requirements
The three latency NFRs below (03a/b/c) measure different slices. NFR-03a is the scheduler's internal decide-to-produce path and is the page-on signal. NFR-03b measures end-to-end submit-to-start with a warm worker pool. NFR-03c is cold-start, bounded by Kubernetes pod startup.
| ID | Requirement | Target |
|---|---|---|
| NFR-01 | Throughput | 10M/day, 500/sec peak |
| NFR-02 | Concurrent in flight | 100K |
| NFR-03a | Scheduler decide → Kafka produce | <50 ms p99 |
| NFR-03b | Submit → worker starts (end-to-end, warm pool) | <1 s p99 |
| NFR-03c | Submit → worker starts (cold-start, pod spin-up) | <30 s p99 |
| NFR-04 | Execution guarantee | Effectively-once (state commit + target-side idempotency) |
| NFR-05 | Scheduler availability | 99.99% (52 min/year) |
| NFR-06 | Worker failure detection | <30 s via heartbeats |
| NFR-07 | DAG resolution for 20-node DAG | <100 ms |
| NFR-08 | Cron drift | <1 s from intended fire time |
| NFR-09 | Retention | Hot 7 d, warm 90 d, cold 1 y (S3) |
| NFR-10 | Scheduler RTO | <15 s |
| NFR-11 | Job RPO | 0 (durable after 202 Accepted) |
| NFR-12 | Max DAG depth | 20 |
| NFR-13 | Max DAG width per level | 100 |
5. Design Assumptions
- Workload shape: bursty on the hour (cron convergence), steady otherwise. Peak is 5× average.
- Tenancy: multi-tenant, shared fleet, fair-queued per tenant (§15).
- Region topology: single write region for job state; workers regional (§16). Active-active schedulers are out of scope. Split-brain risk dominates at this article's depth.
- Max payload inline: 256 KB; larger blobs stored in S3, referenced by pointer.
- Clock: schedulers run chrony/NTP; workers never compute next-fire times.
- Out of scope: cross-org workflow federation, visual DAG editor, BYO container sandboxing, SOC-2-grade audit pipeline.
- Numbers are design targets, not benchmark results.
6. Expanded Architecture
This section adds operational and deployment details on top of §1: leader/standby split, observability (ClickHouse CDC, S3 logs), explicit store responsibilities, and the reasoning for Kafka sitting between scheduler and workers.
Before diving in. The full design solves the brief in §3–§4: DAGs, four priorities, hot-tenant isolation, 100K concurrent. For a smaller brief, Postgres alone is enough at 10M/day. Jump to §7.2 for the minimum-viable baseline. Every component beyond that is earned against a specific limit, not assumed.
This is a stateful coordination system driven by two triggers: time (next_fire_time ≤ now) and dependency (all parents done). Every downstream choice serves those two triggers.
[6.1] Layers
§6.1 surfaces three details §1 left out: the leader/standby split, the S3 log store, and the ClickHouse CDC path. The three paths (submit, schedule, execute) are the same.
[6.2] Store roles
Valkey is a Redis-compatible in-memory datastore. It was forked from Redis 7.2 after the license change. Same protocol, same data structures, BSD-licensed. It fills the role Redis would in this design.
| Store | Tech | Role |
|---|---|---|
| Source of truth | Postgres 17 | Jobs, runs, DAGs, schedules. ACID, partitioned. |
| Schedule index | Valkey 8 | Sorted set of next-fire-times, worker job locks (fencing), heartbeats. |
| Dispatch bus | Kafka 4.0 (KRaft) | Durable hand-off, priority isolation, backpressure. |
| Coordination | Postgres advisory lock (default) or etcd 3.6 (upgrade) | Single-leader election. |
| Logs / analytics | S3 + ClickHouse 24 | Raw logs, queryable duration/failure analytics. |
Execution modes (HTTP / container / handler) are covered in §1.
7. Technology Selection
[7.1] What shape is this system?
This is a stateful coordination system driven by two things:
- time (
next_fire_time ≤ now) - dependencies (parents completed)
That immediately rules out a few options.
KV stores (DynamoDB, ScyllaDB) don't fit well here. The design needs:
- multi-row transactions (job + run + DAG node)
- joins for DAG resolution
- something like
SKIP LOCKEDfor safe concurrent claims
All of these can be worked around on a KV store. But by the time transactions, DAG joins, and safe claiming have been reimplemented, half a database has been built.
Kafka as the source of truth doesn't work either. Rebuilding state from a log at 10M jobs/day is painfully slow. Kafka is great for delivery, not for storing state.
[7.2] The simpler version (don't skip this)
Without DAGs, multi-tenant fairness, or sub-second SLOs, the whole thing collapses into a Postgres-only design:
- One
jobstable withSELECT … FOR UPDATE SKIP LOCKED LIMIT Nfrom a worker pool. - Cron via
pg_cronor an in-process scheduler that polls once a second. - Retries via a
retry_atcolumn. - Leader election via
pg_try_advisory_lock.
That gives safe parallel workers, no extra infrastructure, and decent scale for many real systems. No Kafka, no Valkey, no etcd. River (Go), Oban (Elixir), pg_boss (Node), and Sidekiq Pro (Ruby) all ship variants of this design. The rest of this article only exists because of the things this simple version can't do: DAGs, four priorities, hot-tenant isolation, 100K concurrent.
How workers safely claim jobs (SKIP LOCKED). In the Postgres-only design, workers claim jobs directly from the database:
SELECT id
FROM jobs
WHERE status = 'SCHEDULED'
AND next_fire_time <= NOW()
FOR UPDATE SKIP LOCKED
LIMIT 100;This gives three guarantees:
- Locks the selected rows so other workers cannot modify them.
- Skips already-locked rows so many workers can run this query in parallel without blocking each other.
- Each job is claimed once across the whole fleet. No two workers process the same job.
With SKIP LOCKED, Postgres effectively becomes both the queue and the coordinator. No additional services required.
Why SKIP LOCKED is not used in the full design. Once the load crosses what Postgres alone can handle, claiming moves out of the database:
- Kafka distributes work across consumers.
- Valkey locks + fencing tokens handle correctness.
Workers consume from Kafka rather than poll Postgres. SKIP LOCKED stays out of the hot path. It remains a valid and simpler alternative for smaller systems, which is why it sits in §7.2 and not in the full-design dispatch path.
Each component in the rest of this article is earned against a specific limit:
| Component | The limit it solves |
|---|---|
| Valkey schedule index | At tens of millions of scheduled rows with sub-second polling, Postgres query latency and index maintenance become a bottleneck (tuning-dependent) |
| Kafka priority topics | One worker pool can't serve mixed priorities without head-of-line blocking |
| etcd lease | Failover decoupled from Postgres health is needed |
| KEDA autoscaling | Workload is bursty enough that fixed-size pools waste money |
| Multi-region | Single-region RTO doesn't meet the SLA |
The full design exists because the brief in §1–§4 requires DAGs, four priorities, hot-tenant isolation, and 100K concurrent. A smaller brief deserves a smaller design. Add each piece only when the load curve forces it.
[7.3] Store selection
| Store | Pick | Why |
|---|---|---|
| Job state | Postgres 17 | Already solves transactions, partitioning, JSONB, and logical replication cleanly. Same stack Oban, Sidekiq, pg_boss, and Temporal use. (SKIP LOCKED is primarily used in the simpler Postgres-only design in §7.2; the full design uses Kafka plus Valkey locks for claiming.) |
| Schedule index | Valkey 8 | Cheap, fast "what's due" index. O(log N) sorted sets, sub-ms latency, Lua for atomic lock+fence. |
| Dispatch bus | Kafka 4.0 (KRaft) | Solves delivery and buffering without coupling scheduler to workers. KRaft removes the ZooKeeper tax. |
| Leader election | Postgres advisory lock (default) · etcd 3.6 (upgrade) | Advisory lock is free if Postgres is already there. etcd is the path to failover decoupled from DB health. |
| Worker autoscale | KEDA 2.15 | Kafka-lag scaler ships with the CRD. Node-level: Karpenter. |
| Analytics | ClickHouse 24 | Columnar, fast aggregations on duration and failure rate. |
Why Kafka sits in the middle. Without Kafka between the scheduler and workers:
- The scheduler has to talk directly to every worker, which tightly couples them.
- Bursts have nowhere to sit. The scheduler either drops or blocks.
- A worker crash forces the scheduler to retry manually.
Kafka solves this in a very boring way:
- it buffers spikes
- it decouples scheduler and workers
- it lets workers replay if they crash
- separate topics give clean priority isolation
Kafka just moves messages. It doesn't decide what's correct.
Why not Postgres for scheduling? The obvious query:
SELECT id
FROM jobs
WHERE status = 'SCHEDULED'
AND next_fire_time <= NOW()
LIMIT 1000;Works fine early on. Starts hurting later. Three things show up at scale:
- index churn from lots of "due now" rows
- repeated polling competing with worker writes
- scheduler loop getting slower than the tick budget
A Valkey sorted set is simpler for this: O(log N) inserts, fast range queries, and no pressure on Postgres hot tables. If Valkey gets wiped, the ZSET rebuilds from SELECT … WHERE status='SCHEDULED' AND next_fire_time < NOW + 24h on startup.
Why Valkey is not the queue. Valkey is great for fast lookups, locks, and short-lived coordination. It is not great as a queue: there are no consumer groups, the replay story after crashes is weak (AOF helps but at a throughput cost), and backpressure is manual. Kafka already solves all of that. So the split is:
- Valkey answers "what's due"
- Kafka answers "who gets it"
[7.4] Build vs buy
There are solid alternatives worth weighing before writing any of this:
- Temporal is great for workflow-as-code and cuts a lot of ops work, at the cost of owning less of the stack.
- Airflow is strong for batch pipelines but not for sub-second dispatch.
- Prefect is nice for Python and data teams, with a weaker story around dispatch-side deduplication.
- River, pg_boss, Oban are Postgres-only queue libraries that work very well at small-to-mid scale.
This design gets built rather than bought because Postgres and Kubernetes are already in place, because DAG + cron + effectively-once need to live in one system (no single OSS project covers all three cleanly), and because the priority-topic topology needs direct control.
For most teams, buying is cheaper.
8. Back-of-the-Envelope
[8.1] Throughput
Avg: 10M / 86,400 ≈ 116 jobs/sec
Peak: 5× avg ≈ 500 jobs/sec
Writes per dispatch: 1 Postgres UPDATE + 1 Kafka produce + 1 Valkey ZREM
Peak write rate: ~1,500 ops/sec
A single Postgres primary with read replicas for status-API traffic handles this well within budget.
[8.2] Worker sizing
Concurrent target: 100K
Avg duration: 30 s
Steady state: 116/sec × 30 s = 3,480 in flight (design floor)
Peak: 500/sec × 30 s = 15,000 in flight
100K target: future + burst headroom
Slot-based: 10 jobs/pod (IO-bound default)
Steady: ~400 pods
Peak: ~2,000 pods (KEDA scale-up)
Per-pod concurrency vs Kafka assignment. Each pod runs up to 10 jobs in parallel, but Kafka assigns partitions to consumers, not individual jobs. A pod is one consumer. Implications:
- Number of active consumers is capped by the topic's partition count.
- Per-pod concurrency multiplies throughput within each consumer, but cannot bypass the partition ceiling.
- If partitions < pods, the extra pods sit idle on that topic.
This is a constraint KEDA alone won't solve. Partition sizing is covered in §8.5.
[8.3] Postgres storage
Raw per job row: 500 B
Raw per run row: 300 B (1.2 runs/job avg)
Hot (7 days):
jobs: 70M × 500 B = 35 GB
runs: 84M × 300 B = 25 GB
raw total = 60 GB
+ indexes (~60%) = 96 GB
+ bloat + TOAST (~20%) ≈ 115 GB
Warm (90 days) ≈ 1.4 TB with indexes
Partition monthly via pg_partman. Detach and archive to S3 after 90 days.
Indexes land near 60% of raw because the design carries four targeted indexes:
(next_fire_time) WHERE status='SCHEDULED'(tenant_id, status)(dag_id)(idempotency_key)
Bloat at 20% assumes a routine VACUUM cadence.
[8.4] Valkey memory
Schedule set (next 24h fire times): ~1M × 108 B ≈ 108 MB
Fencing locks (one per running job): 15K × 100 B ≈ 1.5 MB
Heartbeats: 2K × 80 B ≈ 160 KB
Headroom for 10× schedule growth: ≈ 1.1 GB ceiling
A single 4 GB node plus a replica fits with margin. If the scheduled set grows past 2 GB, split by priority into four sorted sets (one per Kafka topic). Same O(log N) lookup, 4× ceiling.
[8.5] Kafka
Peak dispatch rate: 500/sec
Avg message size: 2 KB
Per-topic throughput: ~1 MB/sec
Cluster: 3 brokers, RF=3, min.insync.replicas=2,
4 partitions × 4 priority topics
Capacity isn't the constraint at this scale; the 3-broker minimum exists for replication and rolling restarts.
Why 4 partitions, and when to rethink. Partition count sets the maximum parallelism per consumer group. At 500 jobs/sec peak across four topics (roughly 125/sec per topic) with 10 jobs/pod concurrency, four partitions per topic can drive ~40 active slots, enough headroom for current peaks.
When partition count must be revisited:
- Scaling past 15K concurrent per topic. Worker concurrency of
partitions × jobs_per_podmust exceed peak in-flight on that priority. - Consumer lag spikes despite spare pods. Classic symptom of partitions < pods.
- Introducing new priority tiers or tenants. Each new partition-group axis pushes the numerator up.
Rule of thumb: partitions ≥ max concurrent consumers per topic.
For a 100K-concurrent target, that can mean 100–500 partitions per topic, not four. Four is sized for today's peak, not the design ceiling.
One planning caveat: adding partitions later requires a rolling restart of consumer groups. Plan the jump in advance rather than one-at-a-time.
9. Data Model
Query-first design. Indexes match the three hot paths: due-now lookups, DAG readiness, idempotency dedup.
[9.1] jobs (source of truth)
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
name VARCHAR(255) NOT NULL,
job_type VARCHAR(100) NOT NULL,
priority SMALLINT NOT NULL DEFAULT 2, -- 0=crit, 3=low
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
payload JSONB NOT NULL, -- ≤256 KB; larger → s3_pointer
s3_pointer TEXT,
result JSONB,
schedule_type VARCHAR(20) NOT NULL, -- immediate|one_time|recurring
cron_expression VARCHAR(100),
cron_timezone VARCHAR(50) DEFAULT 'UTC',
scheduled_at TIMESTAMPTZ,
next_fire_time TIMESTAMPTZ,
max_retries SMALLINT NOT NULL DEFAULT 3,
retry_backoff VARCHAR(20) DEFAULT 'exponential',
timeout_seconds INT NOT NULL DEFAULT 300,
idempotency_key VARCHAR(255),
dag_id UUID,
dag_node_id VARCHAR(100),
tags JSONB DEFAULT '[]'::JSONB,
webhook_url VARCHAR(2048),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT valid_status CHECK (status IN (
'PENDING','SCHEDULED','QUEUED','RUNNING',
'SUCCEEDED','FAILED','CANCELLED','DEAD_LETTER'))
) PARTITION BY RANGE (created_at);
CREATE INDEX idx_jobs_next_fire ON jobs (next_fire_time)
WHERE status = 'SCHEDULED';
CREATE INDEX idx_jobs_tenant_status ON jobs (tenant_id, status);
CREATE INDEX idx_jobs_dag ON jobs (dag_id) WHERE dag_id IS NOT NULL;
CREATE UNIQUE INDEX idx_jobs_idempotency
ON jobs (tenant_id, idempotency_key)
WHERE idempotency_key IS NOT NULL;[9.2] job_runs (one row per attempt)
CREATE TABLE job_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID NOT NULL,
attempt_number SMALLINT NOT NULL DEFAULT 1,
status VARCHAR(20) NOT NULL DEFAULT 'RUNNING',
worker_id VARCHAR(255),
fencing_token BIGINT NOT NULL, -- monotonic
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ,
duration_ms INT,
exit_code INT,
error_message TEXT,
result_payload JSONB,
log_url VARCHAR(2048),
UNIQUE (job_id, attempt_number)
) PARTITION BY RANGE (started_at);
CREATE INDEX idx_runs_worker ON job_runs (worker_id) WHERE status='RUNNING';Heartbeats do not live in Postgres (too write-heavy at 100K concurrent). They live in Valkey as hb:{run_id} keys with a 30 s TTL.
DAG + schedules tables
CREATE TABLE dag_definitions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
name VARCHAR(255) NOT NULL,
is_valid BOOLEAN NOT NULL DEFAULT true,
UNIQUE (tenant_id, name)
);
CREATE TABLE dag_edges (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
dag_id UUID NOT NULL REFERENCES dag_definitions(id) ON DELETE CASCADE,
from_node_id VARCHAR(100) NOT NULL,
to_node_id VARCHAR(100) NOT NULL,
condition VARCHAR(20) DEFAULT 'on_success',
UNIQUE (dag_id, from_node_id, to_node_id),
CHECK (from_node_id <> to_node_id)
);
CREATE INDEX idx_dag_edges_from ON dag_edges (dag_id, from_node_id);
CREATE TABLE dag_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
dag_id UUID NOT NULL REFERENCES dag_definitions(id),
status VARCHAR(20) NOT NULL DEFAULT 'RUNNING',
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
CREATE TABLE dag_run_nodes (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
dag_run_id UUID NOT NULL REFERENCES dag_runs(id),
node_id VARCHAR(100) NOT NULL,
job_id UUID,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
pending_parents SMALLINT NOT NULL DEFAULT 0, -- denormalized, decremented atomically
UNIQUE (dag_run_id, node_id)
);
CREATE INDEX idx_dag_run_nodes_status ON dag_run_nodes (dag_run_id, status);
CREATE TABLE schedules (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
job_template JSONB NOT NULL,
dag_id UUID REFERENCES dag_definitions(id),
cron_expression VARCHAR(100) NOT NULL,
cron_timezone VARCHAR(50) NOT NULL DEFAULT 'UTC',
next_fire_time TIMESTAMPTZ NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
is_paused BOOLEAN NOT NULL DEFAULT false
);
CREATE INDEX idx_schedules_fire ON schedules (next_fire_time)
WHERE is_active AND NOT is_paused;[9.3] Valkey key patterns
schedules:due ZSET score = unix_ts(next_fire_time)
lock:job:{job_id} STRING + TTL 300 s
fence:job:{job_id} STRING (monotonic counter, INCR)
hb:{run_id} STRING TTL 30 s (heartbeat)
cancel:{run_id} STRING TTL 5 min (cancel request)
priority:aged:{tenant_id} ZSET score = age_ms (fair queue)
[9.4] Job lifecycle
10. API Design
[10.1] Submit a job
POST /api/v1/jobs
{
"name": "daily-revenue-report",
"job_type": "report_generation",
"priority": 1,
"payload": {"date": "2026-03-27"},
"schedule_type": "one_time",
"scheduled_at": "2026-03-28T09:00:00Z",
"max_retries": 3,
"retry_backoff": "exponential",
"timeout_seconds": 600,
"idempotency_key": "revenue-2026-03-27",
"webhook_url": "https://api.example.com/hooks/job-done"
}202 Accepted returns {id, status: "SCHEDULED", next_fire_time}.
[10.2] Recurring schedule
POST /api/v1/schedules
{
"name": "hourly-sync",
"job_template": { ... },
"cron_expression": "0 * * * *",
"cron_timezone": "UTC"
}
[10.3] DAG create + trigger
DAG creation is cycle-checked before the row is persisted. Invalid graphs are rejected at the API boundary.
POST /api/v1/dags { nodes:[...], edges:[...] }
POST /api/v1/dags/{id}/trigger { parameters:{...}, priority:1 }
GET /api/v1/dags/{id}/runs/{run_id}
[10.4] Ops endpoints
POST /api/v1/jobs/{id}/cancel
GET /api/v1/jobs/{id}
GET /api/v1/jobs?status=FAILED&tenant_id=...
POST /api/v1/dlq/{id}/replay
Full endpoint surface
| Method | Endpoint | Purpose |
|---|---|---|
| POST | /api/v1/jobs | Submit |
| GET | /api/v1/jobs/{id} | Status + runs |
| POST | /api/v1/jobs/{id}/cancel | Cancel |
| GET | /api/v1/jobs | List + filter |
| POST | /api/v1/schedules | Create cron |
| PUT | /api/v1/schedules/{id} | Update cron |
| POST | /api/v1/schedules/{id}/pause | Pause |
| POST | /api/v1/schedules/{id}/resume | Resume |
| DELETE | /api/v1/schedules/{id} | Delete |
| POST | /api/v1/dags | Create DAG (cycle-checked) |
| POST | /api/v1/dags/{id}/trigger | Trigger run |
| GET | /api/v1/dags/{id}/runs/{run_id} | DAG run state |
| GET | /api/v1/dlq | List dead-lettered jobs |
| POST | /api/v1/dlq/{id}/replay | Manual replay |
11. Effectively-Once Execution
Core idea
This design doesn't try to prevent duplicate execution. It makes sure duplicate execution doesn't matter.
- Multiple workers may run the same job under failure.
- Only one result is allowed to stick.
Lock vs fencing (quick intuition)
- Lock decides who is currently running.
- Fencing token decides whose result gets accepted.
Both are needed. The lock stops simultaneous execution most of the time; fencing handles the cases where the lock alone fails (GC pauses, TTL expiry, partition reassignment).
Real-world duplicate scenario
This one happens all the time:
- A worker runs a job successfully.
- It writes the result to Postgres.
- It crashes before committing the Kafka offset.
- Kafka redelivers the message.
- Another worker runs the same job.
So the job ran twice. It's still safe:
- Fencing blocks the duplicate DB write.
- Target-side idempotency blocks the duplicate side effect.
Full recovery timeline in §19.3.
[11.1] Why this isn't exactly-once
Fencing gives exactly-once state update, not exactly-once execution.
Concrete example: a worker calls a payment API, the charge succeeds, the worker crashes before writing the DB row. The retry calls the payment API again. If that API isn't idempotent, the card gets charged twice.
So the split is: DB correctness comes from fencing, and external correctness comes from target-side idempotency.
Kafka adds another layer of duplication on top of all this. Consumers can re-read messages on rebalance, on restart, and on commit failure. Every dedup mechanism below assumes a message may arrive at two workers.
[11.2] Worker flow (order matters)
The worker runs this sequence. Any other order leaves the system inconsistent.
1. Consume Kafka message (no commit yet)
2. Lua: acquire lock + increment fence token → token
If nil, another worker holds it. Skip, commit offset, done.
(The skip commit is safe: whoever holds the lock is authoritative,
so re-reading the message would just repeat the skip.)
3. INSERT job_runs (fencing_token=token, status='RUNNING')
4. UPDATE jobs SET status='RUNNING' WHERE id=$job AND status='QUEUED'
5. Execute the job (HTTP / container / handler)
6. UPDATE job_runs SET status=...
WHERE job_id=$job AND fencing_token=$token AND status='RUNNING'
7. Commit Kafka offset
Why this order?
- Committing the Kafka offset early would lose the message on crash.
- Writing results without a fencing check would let a stale worker overwrite a newer run.
- Skipping the
INSERTwould lose the execution record.
A crash between any two steps is safe: fencing invalidates stale writes, the uncommitted Kafka offset lets a replacement worker re-read, and the next claim bumps the token.
If step 4 affects zero rows, the job moved on (cancelled, or already claimed). Stop, and commit the Kafka offset so the message doesn't get redelivered. The job_runs row from step 3 stays as an abandoned attempt; the next successful claim allocates a higher token and writes a fresh job_runs row. The abandoned one never commits as succeeded.
[11.3] Why a lock alone isn't enough
A classic failure that keeps coming back:
- t=0 s. Worker A acquires
SETNX lock:job:42with a 300 s TTL and starts running. - t=180 s. Worker A hits a 200 s GC pause.
- t=300 s. The lock TTL expires while A is still paused.
- t=301 s. Worker B acquires the lock and starts running the same job.
- t=380 s. Worker A wakes up from GC and writes results.
- t=450 s. Worker B writes results over A's.
Both workers believed they held the lock. Both wrote results. Data is corrupted.
Fencing fixes this. One Lua script atomically acquires the lock and increments a monotonic counter. Every result write is conditional on fencing_token = $mine. A sleeping worker wakes up with token 7, but the DB only accepts token 8 now, so its write becomes a no-op. Postgres is the final arbiter.
token = redis.eval("""
if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', 300) then
return redis.call('INCR', KEYS[2])
else
return nil
end
""", keys=["lock:job:42","fence:job:42"], args=[worker_id])
# ... run job ...
rows = db.execute("""
UPDATE job_runs SET status='SUCCEEDED', result_payload=$1, completed_at=NOW()
WHERE job_id=$2 AND fencing_token=$3 AND status='RUNNING'
""", result, job_id, token)
if rows == 0:
log.warn("fencing token superseded; discarding result")[11.4] External idempotency
Fencing does nothing for external systems. If a job calls a payment API, an email service, or a webhook, the worker has to send a stable idempotency key.
The rule is simple:
Idempotency-Key = client_key OR hash(job_id)
Never job_id + attempt. That changes every retry and defeats target dedup.
Two separate layers live under the word "idempotency":
- Submission idempotency (client → API) prevents duplicate jobs. Enforced by
UNIQUE (tenant_id, idempotency_key). - Execution idempotency (worker → target) prevents duplicate side effects. Sent as the
Idempotency-Keyheader on the downstream call.
The target must persist the key long enough to cover the full retry window. Stripe keeps keys for 24 h; most mailers for a few hours to a few days. If the scheduler's timeout × max_retries + backoff exceeds the target's retention, a late retry will re-execute the side effect.
Key idea. Fencing protects database state. Idempotency protects external systems.
[11.5] Cancellation
POST /api/v1/jobs/{id}/cancel behaves differently depending on the current state.
| State when cancel arrives | Action |
|---|---|
PENDING / SCHEDULED | UPDATE status='CANCELLED' and ZREM from Valkey. Done. |
QUEUED (in Kafka, not yet claimed) | Mark CANCELLED in Postgres. The worker's claim step reads status; if CANCELLED, it commits the Kafka offset and moves on. |
RUNNING | Write CANCEL_REQUESTED to Valkey key cancel:{run_id} (5 min TTL). Behavior depends on mode: built-in handler polls the key between steps and exits cleanly; container receives SIGTERM from the worker; HTTP cannot be interrupted mid-request and cancellation takes effect after the current call returns. |
The fencing token invalidates late writes: the worker's final conditional UPDATE finds status='CANCELLED' and the row doesn't update. No stale success ever lands.
BIGINT fencing token: 9.2×10¹⁸ values. At 500 dispatches/sec, overflow takes 5×10⁸ years. Not a concern.
[11.6] Where duplicates are handled
Duplicates can arise at three layers. Each is handled differently.
| Layer | What can happen | Who stops it |
|---|---|---|
| Kafka delivery | Same message delivered more than once on rebalance or commit failure | Tolerated (at-least-once is the contract) |
| Worker race | Two workers claim the same job after lock expiry or partition reassignment | Fencing tokens; only the current-token UPDATE commits |
| External side effect | The HTTP target receives the same request twice on retry | Target-side idempotency key |
No single layer gives exactly-once execution. Correctness is the composition of all three.
[11.7] Dispatch is not atomic
As noted in §1.2, dispatch is not atomic across Valkey, Kafka, and Postgres. The risky case:
- Leader produces to Kafka, then crashes before marking the job
QUEUEDin Postgres.
On recovery the job may be consumed by a worker while Postgres still shows SCHEDULED. Two safety nets absorb this:
- Fencing tokens reject any second-worker write that races on the same job.
- The worker's step 4 (
UPDATE jobs SET status='RUNNING' WHERE id=$job AND status='QUEUED') is conditional. If the race landed the row in a different state, theUPDATEaffects zero rows and the worker bails out cleanly.
A brief redundant execution is tolerated. A committed duplicate is not. Recovery timeline for this scenario lives in §19.1.
[11.8] What we guarantee, and what we don't
The design delivers effectively-once execution: correct final state in Postgres and no duplicate side effects at the target, given a correctly implemented idempotency key.
It does not claim:
- Exactly-once execution. A target without idempotency can still receive duplicate calls.
- Global ordering across jobs. Only per-partition order is preserved; DAG-level ordering is the responsibility of DAG edges.
- Immediate execution under heavy backpressure. Layer-3 admission control (§18.1) throttles submissions when Postgres is at risk.
12. Leader Election
One active scheduler, one hot standby. At this scale the simplest correct primitive is Postgres advisory locks: no extra service, same single-writer guarantee. etcd is an upgrade path for multi-region or failover decoupled from Postgres.
What's an advisory lock? A named lock keyed by a 64-bit integer. It doesn't lock a table or a row; it's just a coordination name managed by Postgres. Both callers agree on one integer (here, the constant SCHEDULER_LOCK_KEY = 91823745). Whoever calls pg_try_advisory_lock(SCHEDULER_LOCK_KEY) first wins; the next caller gets false. When the holder's session ends (disconnect, crash, network drop), Postgres auto-releases the lock with no extra cleanup.
Default: Postgres advisory lock. Both scheduler instances poll for pg_try_advisory_lock(SCHEDULER_LOCK_KEY). The winner runs the loop and releases on shutdown. On crash, the session ends and the lock auto-releases. Failover is bounded by the loser's poll interval (e.g., 5 s).
SELECT pg_try_advisory_lock(91823745);
-- Held for the lifetime of the session; releases on disconnect.There's a tradeoff. The lock lives on the Postgres primary, so a Postgres failover briefly leaves no leader.
That's acceptable: during a Postgres outage the scheduler can't write state anyway. Workers keep draining Kafka and reconcile on recovery.
Upgrade: etcd lease. For multi-region or failover decoupled from Postgres, switch to an etcd lease (TTL = 15 s, tied to NFR-10):
session, _ := concurrency.NewSession(etcd, concurrency.WithTTL(15))
election := concurrency.NewElection(session, "/scheduler/leader")
for {
election.Campaign(ctx, hostname)
leaderCtx, cancel := context.WithCancel(ctx)
go func() { <-session.Done(); cancel() }()
runSchedulerLoop(leaderCtx)
cancel()
}Redlock and ZooKeeper are both worse fits here. Redlock's safety relies on clock assumptions that break under GC pauses (Kleppmann, 2016). ZooKeeper gives etcd-equivalent guarantees at heavier operational weight, not worth a new deployment in 2026.
Is a single leader a bottleneck? At 500 jobs/sec, no. The leader's per-job work is a Valkey ZRANGEBYSCORE, a Kafka produce, and one Postgres UPDATE. There is no per-job CPU cost worth sharding across schedulers.
The real limit is correctness, not throughput. Two leaders running at once without sharded ownership would dispatch the same job twice. Fencing contains DB damage, but the external side effect still fires twice.
Split-brain cost, quantified. A split-brain window produces duplicate dispatches for the whole duration of the partition. Same-region partitions heal in seconds; cross-region partitions can last minutes, which means thousands of duplicate side effects at peak load. This is why etcd's 15 s lease TTL (or the advisory-lock poll interval) is chosen conservatively: a brief gap with no leader is preferable to a longer gap with two.
Sharded active-active earns its complexity past 5K dispatches/sec (§16). Below that, single-leader is simpler and sufficient.
13. DAG Resolution
[13.1] DAG concepts: node vs job vs run
A DAG is a workflow of dependent steps.
dag_id→ workflow definition (blueprint).dag_run_id→ one execution of that workflow.node_id→ logical step (for examplesend_email,charge_card).job_id→ actual execution of that step.
Nodes are not jobs. A job is created only when a node becomes ready to run.
DAG execution relies on the same fencing + idempotency model described in §11. Each DAG node becomes a regular job when it's ready, and that job runs through the same execution path as any other.
[13.2] How DAG execution is tracked
Each trigger of a DAG creates a new execution instance identified by dag_run_id. Two tables carry the state.
dag_runs (one row per DAG execution):
| Column | Purpose |
|---|---|
dag_run_id | Primary key |
dag_id | Which workflow |
status | RUNNING, SUCCEEDED, FAILED, PARTIALLY_FAILED |
started_at / completed_at | Timestamps |
dag_run_nodes (one row per node per DAG run):
| Column | Purpose |
|---|---|
dag_run_id | Which run |
node_id | Which logical step |
job_id | Actual execution instance (NULL until the node is scheduled) |
status | PENDING, READY, RUNNING, SUCCEEDED, FAILED, SKIPPED |
pending_parents | How many dependencies still need to complete |
The mapping (dag_run_id, node_id) → job_id is how the scheduler turns a logical step into a runnable job. Each job row also carries dag_id and dag_node_id, so a running job can always trace back to its DAG run.
[13.3] Example: resolving A → B → C
Take a three-node DAG: A → B → C.
At DAG trigger time the scheduler creates three dag_run_nodes rows:
| node | pending_parents | status | job_id |
|---|---|---|---|
| A | 0 | READY | NULL |
| B | 1 | PENDING | NULL |
| C | 1 | PENDING | NULL |
Execution flow:
- A is
READY. The scheduler createsjob_101for A and dispatches it. - A completes. The scheduler decrements B's counter:
The return value issql
UPDATE dag_run_nodes SET pending_parents = pending_parents - 1 WHERE dag_run_id = $1 AND node_id = 'B' RETURNING pending_parents;0. B becomesREADY.job_102is created and dispatched. - B completes. C's counter goes
1 → 0. C becomesREADY.job_103is created and dispatched.
[13.4] Multiple runs of the same DAG
The same DAG definition can be triggered many times. Each trigger gets its own dag_run_id, independent job IDs, and independent node state.
| dag_id | dag_run_id | A.job_id | B.job_id | C.job_id |
|---|---|---|---|---|
etl-pipeline | run_1 | job_101 | job_102 | job_103 |
etl-pipeline | run_2 | job_201 | job_202 | job_203 |
Same blueprint, independent executions.
[13.5] Mental model
| Concept | Role |
|---|---|
DAG (dag_id) | Blueprint |
DAG run (dag_run_id) | One execution of the blueprint |
Node (node_id) | Logical step within the DAG |
Job (job_id) | Actual execution instance of a node |
When a node becomes READY, the scheduler creates a job for it and puts it through the normal job lifecycle (SCHEDULED → QUEUED → RUNNING → SUCCEEDED / FAILED). The DAG machinery ends at job creation; everything after is the regular execution path.
Node readiness is monotonic. A node is READY only after every parent reaches a terminal state (SUCCEEDED, FAILED, or SKIPPED). A late-arriving failure on an already-dispatched child does not roll the child back. Rollback of downstream nodes is not supported; pipelines that need it must model compensating jobs explicitly.
Retries are per-node, not per-DAG. A failed node retries in place up to its own max_retries using the normal job retry path. The rest of the DAG is not restarted. Downstream nodes are evaluated only when a node reaches a terminal state (SUCCEEDED, FAILED, SKIPPED).
With the concepts in place, the rest of §13 covers the two execution phases.
Two phases: creation-time (cycle detection) and runtime (readiness check per node completion).
Creation-time. DFS with three-color marking rejects cycles before the DAG is persisted (Appendix A).
Runtime: denormalized parent counter, not a JOIN. Every dag_run_nodes row carries pending_parents, initialized at run creation from the in-memory edge cache. On parent completion, the scheduler issues one atomic decrement per downstream edge:
UPDATE dag_run_nodes
SET pending_parents = pending_parents - 1,
status = CASE WHEN pending_parents - 1 = 0 THEN 'READY' ELSE status END
WHERE dag_run_id = $1 AND node_id = $2
RETURNING pending_parents, status;When the returned pending_parents = 0, the node is ready. Create the job and dispatch.
Why the counter is race-free. The decrement is a single atomic UPDATE … RETURNING. Two parent completions landing at the same instant still serialize on the row lock. Only the completion that transitions the counter from 1 to 0 sees pending_parents = 0 in its return, so only that caller dispatches the downstream node. No explicit coordination needed.
func onCompleted(dagRunID, nodeID, status string) {
for _, edge := range dagCache.Downstream(dagID, nodeID) {
if !conditionMet(edge.Condition, status) {
markSkipped(dagRunID, edge.ToNodeID); continue
}
remaining := db.QueryRow(decrementSQL, dagRunID, edge.ToNodeID)
if remaining == 0 {
createAndDispatch(dagRunID, edge.ToNodeID)
}
}
checkDagCompletion(dagRunID)
}A COUNT(*) JOIN across dag_edges and dag_run_nodes would be 500K joined queries/second at 100K concurrent × 5 edges avg. The denormalized counter is one indexed UPDATE … RETURNING per edge, orders of magnitude cheaper. The atomic decrement also removes the read-modify-write race.
Edge conditions:
| Condition | Upstream OK | Upstream Failed |
|---|---|---|
on_success | run | skip |
on_failure | skip | run |
always | run | run |
DAG structure cached in memory. 50K DAGs × avg 5 edges = 250K entries, ~20 MB. Invalidated on /dags write (rare). For very large DAGs (>1K nodes), the cache holds an LRU window; cold DAGs reload from dag_edges on demand.
Cancellation and failure propagation. If a node ends in CANCELLED or FAILED, downstream nodes are evaluated against their edge conditions (on_success skips, on_failure runs, always runs). A skipped node still decrements its own children's pending_parents so the DAG run can terminate cleanly instead of hanging on a node that will never complete.
14. Cron Parsing & Clock Skew
Parse once at schedule creation, store next_fire_time. Don't parse on every tick.
schedule, _ := cron.NewParser(
cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow,
).Parse(expr)
next := schedule.Next(afterTime.In(tz)).UTC()Recompute next_fire_time before dispatch, not after. The recurring-job loop:
1. Read due row from Valkey
2. Compute next_fire_time from cron expression
3. UPDATE schedules SET next_fire_time = $next
Valkey ZADD schedules:due <next> <id>
4. Produce current instance to Kafka
5. Commit
Computing and persisting the next fire time before producing to Kafka means a crash between step 4 and step 5 leaves the schedule already advanced, so the standby re-reads from Valkey and continues. The other order (produce first, recompute after) loses the schedule on crash. Workers never touch cron.
Overlapping runs of a recurring job. If a cron fires while the previous run of the same schedule is still RUNNING, the new run is a separate job with its own job_id. The scheduler does not wait for the previous run to finish. Clients that need mutual exclusion must set an idempotency_key scoped to the fire window (for example the ISO hour).
Valkey sorted set as the due-index. ZRANGEBYSCORE schedules:due -inf NOW LIMIT 1000 returns due schedules in O(log N + M). At 1M entries that's a few ms per tick. The LIMIT 1000 is also the batch dispatch size: up to 1000 due jobs per tick, not one at a time.
DST. The cron library handles spring-forward (2:30 AM doesn't exist → returns 3:00 AM) and fall-back (2:30 AM happens twice → first occurrence). Test cases in Appendix B.
Clock skew. Only the leader evaluates NOW(). The leader runs chrony with makestep 1.0 3 and alerts on drift >50 ms. Cross-region time uses AWS Time Sync (stratum-1 per region).
Exhaustion. The Valkey sorted set only holds the next 24 hours. Schedules further out live in Postgres and enter the ZSET when their fire time reaches the window. A daily warm-up job (itself a cron) runs at 00:30 UTC and ZADDs every schedule whose next_fire_time lands in the next 24 hours and isn't already in the sorted set.
15. Priority Queues & Multi-Tenant Fairness
Four priorities (0 critical → 3 low), four Kafka topics, four worker pools. A single topic causes priority inversion: Kafka has no priority within a partition, so low-priority backlog starves critical work.
Kafka ordering is per-partition, not global. Jobs consumed from different partitions may execute out of order. Execution correctness comes from DAG edges (§13) and fencing tokens (§11), never from Kafka delivery order.
Worker allocation:
| Priority | Pool size | Max wait SLO |
|---|---|---|
| Critical | 50 (dedicated) | 5 s |
| High | 100 | 30 s |
| Normal | 200 | 5 min |
| Low | 50 | 30 min |
Aging without re-producing to Kafka. Re-producing to a higher-priority topic risks duplicate dispatch (the original message may still be consumed). Aging lives upstream of Kafka. Every 60 s:
UPDATE jobs
SET priority = priority - 1
WHERE status = 'SCHEDULED'
AND updated_at < NOW() - INTERVAL '5 minutes'
AND priority > 0;The next scheduler tick reads from Valkey and produces upgraded jobs to the new priority topic. No duplicate dispatch, no Kafka dedup-key machinery.
Jobs already in Kafka stay in their original topic. The priority change only matters on retry.
[15.1] Hot-tenant / noisy neighbor
One tenant submits 1M low-priority jobs at 09:00. The normal topic fills. Other tenants wait behind it.
The fix is per-tenant fair queueing before Kafka. The scheduler reads from Valkey, not Kafka, when choosing what to produce. A weighted round-robin, capped by each tenant's quota (max 10K concurrent/tenant, max 100 submits/sec/tenant), chooses the next job.
Kafka backstops this at the broker level with quotas.client-id limits per tenant. A misrouted application layer can't overwhelm the cluster.
For each tick:
due_jobs = ZRANGEBYSCORE(schedules:due, -inf, NOW, LIMIT 1000)
group by tenant_id
for each tenant (round-robin, weighted by priority):
if tenant.inflight < tenant.quota:
produce(kafka, by priority topic)
update inflight
Per-tenant API rate limits (100 submits/sec, 10K concurrent, 50 DAG triggers/min) are the outermost layer; they stop bursts at the front door before they reach the scheduler. Details in §22.
16. Multi-Region Scheduling
Schedulers do not scale by going active-active across regions. Split brain on lease ownership means double dispatch, and the downstream side effect (card already charged) still happens twice. The right topology is single write region, regional workers.
- Writes (submit, schedule, leader election): US-EAST only.
- Reads (status, run history): regional replicas via logical replication. Lag is under 1 s on healthy links and can grow under sustained write pressure.
- Workers run in every region and consume from the local Kafka mirror. Regional execution keeps job latency local even though scheduling is centralized. Conditional
UPDATEs still target the primary Postgres, which remains the final arbiter for fencing tokens. - MirrorMaker is not exactly-once. It can reintroduce duplicates on broker failover. That's fine: fencing tokens in §11 dedupe at the DB layer, so cross-region mirrors count as another at-least-once source.
- Failover: US-EAST loss triggers a manual promotion of the EU-CENTRAL Postgres replica. RPO = logical-rep lag (<1 s typical). Realistic RTO is 5–10 min including human verification.
Why not active-active schedulers across regions? The single-leader-vs-sharded argument is covered in §12. The multi-region version has the same answer and an extra failure mode: network partitions between regions. A partitioned pair of active-active leaders would dispatch duplicates for the duration of the partition, which at regional-WAN latency can be minutes. Sharded-ownership active-active becomes worthwhile past ~5K dispatches/sec, as noted in §12.
17. Bottlenecks
[17.1] Heartbeat write amplification
100K concurrent × 1 heartbeat / 10 s = 10K writes/sec from heartbeats alone. Postgres would melt.
Heartbeats go to Valkey (hb:{run_id} with 30 s TTL), not Postgres. Valkey sustains several hundred thousand ops/sec per node under replication, well above the 10K steady state.
Grace: three missed beats, not one. A 30 s TTL with a 10 s refresh tolerates two missed beats (network blip, container CPU starvation, brief GC). Only after three missed beats does the scheduler mark the run FAILED. This prevents false failovers on flaky networks. Tunable per-job-type via heartbeat_grace_seconds.
[17.2] DAG readiness check fan-out
At 500 completions/sec × ~5 downstream edges avg = ~2,500 readiness checks per second. With the in-memory DAG cache (§13) and the pending_parents counter, each check is one UPDATE … RETURNING at ~0.3 ms. Budget: ~0.75 s of Postgres IO per second. Within capacity.
[17.3] Kafka consumer lag
A 5,000-job burst over 10 s, at 500/sec worker throughput, accumulates lag. KEDA scales pods on lag; scale-up takes ~30 s. Bridge the gap with 20% over-provisioned steady state.
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata: { name: worker-pool-normal }
spec:
scaleTargetRef: { name: worker-pool-normal }
minReplicaCount: 200
maxReplicaCount: 500
triggers:
- type: kafka
metadata:
bootstrapServers: kafka:9092
consumerGroup: worker-pool-normal
topic: job.dispatch.normal
lagThreshold: "50"[17.4] Partition-bound parallelism (the hard ceiling KEDA cannot break)
Kafka caps parallelism at the partition level. Each partition is consumed by exactly one consumer inside a consumer group. This is a common mistake when scaling job-queue systems.
- Maximum active consumers per topic = partition count for that topic.
- Per-pod concurrency (10 jobs/pod here) multiplies throughput within each active consumer.
- Extra pods beyond partition count sit idle on that topic.
Example: 4 partitions + 20 pods = 4 active consumers, 16 idle. Adding more pods does nothing for throughput.
Failure mode when partitions are undersized:
- Kafka lag climbs.
- KEDA keeps scaling pods.
- Throughput plateaus.
- Infra cost grows linearly while effective work stays flat.
KEDA is elastic scaling bounded by the partition ceiling. Partition sizing (§8.5) is what sets that ceiling. The two must be tuned together; neither alone is sufficient.
18. Backpressure, Retries, Webhooks
[18.1] Three-layer admission control
KEDA is reactive. Real bursts (Black Friday, outage recovery) can exceed node-level scale-up. Three admission layers, watching three different signals:
- Kafka consumer lag. If normal-topic lag >10K for 60 s, the scheduler stops producing new low-priority jobs. They stay in Valkey marked
SCHEDULED. High + critical keep flowing. - Scheduler loop time. If loop time >500 ms for 2 min, the API returns 429 for low-priority submissions. High and critical still accepted. Clients retry.
- Postgres write latency. If
pg_stat_statementsreportsUPDATE jobsp95 >50 ms for 60 s, the scheduler halves its batch size (LIMIT 500) and the API throttles all submissions to 50% of quota. The DB is the most expensive thing to recover.
Critical and high are never shed at layers 1 and 2 (reserved worker pool). Layer 3 hits everyone equally, because a melted primary impacts all priorities.
[18.2] Retry-storm protection
A downstream outage fails every in-flight job of that job_type simultaneously. Naïve retry hammers the recovering service 60 s later. Three layers:
- Exponential backoff with jitter.
delay = min(2^attempt × base, cap) × random(0.5, 1.5). Jitter prevents synchronized retries. Fixed (delay = base) and linear (delay = base × attempt) modes are available viaretry_backoffon the job row when deterministic spacing is needed. - Per-
job_typecircuit breaker. A rolling 1-minute failure rate >50% pauses new dispatches for that type for 30 s. In-flight jobs aren't affected. Closes when failure rate drops below 20%. - Concurrent-retry cap.
max_concurrent_retries_per_job_type = 100. The 101st waits inSCHEDULEDuntil a slot frees.
Poison pills. Jobs that fail deterministically on every attempt (bad payload, permanent upstream error) exhaust their retry budget and land in DEAD_LETTER. The DLQ exists precisely to stop a poisoned job from consuming retry slots forever. Inspect via GET /api/v1/dlq; manual replay via POST /api/v1/dlq/{id}/replay after fixing the root cause.
[18.3] Webhook durability
A webhook is a job. On main-job completion, the scheduler enqueues a webhook_delivery job with the same retry config (exponential backoff, 4 attempts, then DLQ). No bespoke webhook path with its own retry/DLQ/rate-limit semantics.
19. Failure Scenarios
[19.1] Leader scheduler crashes
Scenario. Active scheduler OOMs at T+0.
| Time | Event |
|---|---|
| T+0 s | Leader dies. In-flight ZRANGEBYSCORE → Kafka loop is lost. |
| T+0–5 s | Advisory-lock session ends on Postgres disconnect. (For etcd mode: KeepAlive stops; scheduling pauses until lease TTL.) |
| T+5 s | Standby's pg_try_advisory_lock poll succeeds. (etcd mode: T+15 s when lease expires.) |
| T+5–7 s | New leader loads DAG cache, reconciles Valkey against Postgres, resumes. |
Dispatch pause: ~7 s with advisory lock, ~17 s with etcd lease. No job loss: the Valkey ZSET retains due jobs, and the new leader re-dispatches. Jobs produced but not ZREM'd are re-produced; fencing tokens block duplicate execution.
RTO is dominated by the loser's poll interval plus warm-up, not by the lock primitive alone.
[19.2] Worker crashes mid-job
Scenario. Worker pod OOM-killed at T+0 with a 10-min job running.
| Time | Event |
|---|---|
| T+0 s | Pod dies. Valkey hb:{run_id} stops being refreshed. |
| T+30 s | hb:* TTL expires. Scheduler marks run FAILED. |
| T+30 s | Retry enqueued (attempt < max). New fencing token allocated. |
| T+30–60 s | New worker claims and starts executing. |
30 s detection + retry start. Partial side effects from the crashed run depend on the job's own target-side idempotency key. Fencing tokens ensure only the new run's results persist.
[19.3] Worker succeeds but fails to commit the Kafka offset
Scenario. Worker finishes the job, writes the
SUCCEEDEDrow to Postgres, then crashes before step 7 (commit Kafka offset).
| Time | Event |
|---|---|
| T+0 s | Worker completes execution and commits job_runs.status = 'SUCCEEDED'. |
| T+0 s | Worker crashes before committing the Kafka offset. |
| T+10 s | Kafka consumer-group rebalance delivers the message to another worker. |
| T+10 s | New worker acquires lock, reads Postgres, sees job already terminal, skips execution, commits offset. |
No duplicate commit lands. Fencing tokens would reject it anyway, but the explicit terminal-status check in the worker claim step avoids a needless re-run. If the claim step misses (older client, race), the target-side idempotency key prevents duplicate external side effects. This is the scenario where all three safety layers (lock, fencing, target idempotency) cooperate.
[19.4] Postgres failover
Scenario. Primary unavailable.
| Time | Event |
|---|---|
| T+0 s | Writes fail. API returns 503. |
| T+5–30 s | Patroni / RDS promotes standby. DNS switches. |
| T+30 s | Scheduler reconnects. Buffered worker results are written. |
30 s of degraded submits (clients retry). The scheduler can keep dispatching from Valkey during the outage, just not update status. On recovery, a reconciliation pass resolves Valkey/Postgres divergence (Postgres wins).
[19.5] Valkey failure
Scenario. Primary Valkey node dies.
| Time | Event |
|---|---|
| T+0 s | Scheduler ZRANGEBYSCORE errors. |
| T+5 s | Sentinel/Cluster promotes replica. Writes resume. |
| T+5–60 s | Scheduler rebuilds ZSET from Postgres (status='SCHEDULED' AND next_fire_time < NOW + 24 h). |
5 s dispatch pause, 60 s warm-up. Valkey is derived state; the rebuild path is what lets it be treated as a cache.
[19.6] Kafka broker outage
Scenario. One of 3 brokers down.
| Time | Event |
|---|---|
| T+0 s | ISR drops. Produces keep succeeding at acks=all (2 remaining replicas). |
| T+X min | Broker replaced; replica bootstraps from peers. |
Zero data loss. Producer p99 bumps ~5 ms. No user-visible effect.
[19.7] Region outage (primary)
Scenario. US-EAST fully unavailable.
| Time | Event |
|---|---|
| T+0 s | Submits fail region-wide. Workers in other regions keep draining Kafka mirrors. |
| T+5–10 min | Operator promotes EU-CENTRAL Postgres replica. Scheduler starts as leader in EU. |
| T+10 min | Valkey + Kafka roles promoted in new primary. Full recovery. |
RTO 5–10 min, RPO <1 s. Manual failover by design; automatic DR under ambiguous failure is how data is lost.
20. Operational Playbook
[20.1] Deployment
- Scheduler: 2 replicas (leader + standby),
maxSurge=1 maxUnavailable=0. Both campaign; only the leader runs the loop. - Workers: four deployments, one per priority, each with its own KEDA
ScaledObject. - Canary: new scheduler image deploys to 1 pod, 15-min observation (dispatch latency, fencing conflicts, failover flaps). Argo Rollouts auto-rollback on any metric >2× baseline.
- Schema migrations: forward-only, online (additive), run as Kubernetes Jobs before app deploy.
pg_partmanhandles monthly partitions automatically. - Kafka topics: pre-created in CI/CD, never auto-created. Partition-count changes require a rolling restart of consumer groups.
[20.2] Metrics and alerts
Design-specific alerts only. Standard infra (pod CPU, network) lives on the platform dashboard.
| Metric | Threshold |
|---|---|
scheduler_decide_latency_p99 | >50 ms for 5 min |
scheduler_leadership_changes | >1 in 10 min (flapping) |
fencing_token_conflicts | >0 in 5 min (double dispatch) |
kafka_consumer_lag | >5K for 2 min (by topic) |
dead_letter_queue_depth | >100 jobs |
valkey_memory_usage_pct | >80% |
pg_replication_lag_bytes | >10 MB |
[20.3] Backup and recovery
| Failure | RPO | RTO |
|---|---|---|
| Valkey node | N/A (derived) | <60 s |
| Postgres node | 0 | 2–5 min |
| Single region (primary lost) | <1 s | 5–10 min (manual, human verification) |
| Kafka broker | 0 | <2 min |
| Full restore from snapshot | <1 day | 2 h |
Daily WAL backups to S3. Weekly restore drill on a random node.
[20.4] Capacity planning
Three leading indicators:
- Scheduler loop time. Target <200 ms avg, <500 ms p99. Sustained >500 ms means Valkey or Postgres is the bottleneck.
- Kafka consumer lag per topic. Target <100 avg. Sustained >1K means the worker pool is undersized.
- Postgres WAL rate. Warn at 70% of replica-link capacity.
[20.5] Top 5 alerts (3 AM on-call)
The page-now subset of §20.2, with first-check context.
- Fencing token conflict >0. Something dispatched twice. Check leader flapping and GC pauses on the leader pod.
- Leadership changes >1/10 min. Leader pod unhealthy, or the coordination store (Postgres advisory lock, or etcd on the upgrade path) is flapping. Check scheduler logs and coordination-service status.
- Kafka lag sustained high. KEDA isn't scaling (check HPA events) or the node pool is at capacity (check Karpenter).
- DLQ depth >100. A job type is failing repeatedly. Check the latest deploy;
GET /api/v1/dlqgroups byjob_type. - Postgres replication lag >10 MB. Primary is overloaded or replica disk is slow. Consider migrating read traffic.
21. SLOs and Error Budgets
| SLI | SLO | Monthly budget |
|---|---|---|
| Scheduler decide p99 ≤ 50 ms | 99.95% | 21.6 min |
| Submit → worker start (warm) p99 ≤ 1 s | 99.9% | 43.2 min |
| Job durability (once accepted) | 100% | Budget-less; any loss is a postmortem |
| Scheduler availability | 99.99% | 4.3 min |
Secondary SLOs
| SLI | SLO | Budget |
|---|---|---|
| DAG resolution p99 ≤ 100 ms | 99.5% | 3.6 h |
| Cron drift ≤ 1 s | 99.9% | 43.2 min |
| Webhook delivery p99 ≤ 30 s | 99% | 7.2 h |
Burn policy. Normal burn (≤1×/30 d): continue. Fast burn (>2×/7 d): freeze non-critical deploys. Very fast (>4×/1 d): page, freeze all deploys, IR. Exhausted: next sprint is reliability.
Alert tiers.
- Page-now: fencing conflict, leadership flapping, scheduler availability burn.
- Page-business-hours: DLQ depth, Kafka lag, replication lag.
- Ticket-only: capacity warnings, individual job-type failure spikes.
22. Security
- Payload encryption at rest.
pgcryptoAES-256; key in AWS KMS (prod) or Kubernetes Secret (dev). Workers decrypt at execution. - Tenant isolation. Every API call is scoped to
tenant_id. Workers run in per-tenant namespaces with NetworkPolicy restricting egress to the data plane + explicit allowlists per job type. - RBAC. Three roles:
job:admin(create/update/delete),job:operator(cancel/retry),job:viewer(read-only). - Audit. All lifecycle events flow to an append-only Kafka topic, tiered to S3 after 30 days for long-term retention. Events include actor, IP, action.
- Secret injection. Payloads reference secrets by name (
"db": "secret:prod-creds"); resolved from Vault or AWS Secrets Manager at execution. Never logged. - Rate limits per tenant: 100 submits/sec, 10K concurrent, 50 DAG triggers/min. Enforced via Valkey sliding window at the API layer; Kafka broker quotas as the backstop (§15.1).
23. Key Takeaways
Five things worth remembering:
- Postgres +
SKIP LOCKEDis the right core for stateful coordination. KV stores are wrong here. - "Effectively-once," not "exactly-once." Fencing tokens give state commit; target-side idempotency gives side-effect safety. Both are required.
- Denormalize
pending_parentsondag_run_nodes. ACOUNT(*)JOIN per completion won't survive 100K concurrent. - Start with
pg_advisory_lock. Add etcd only when failover must be decoupled from Postgres. - Earn each component. Simpler version in §7.2; add Kafka, Valkey, KEDA, multi-region only against a specific limit.
24. Appendix
A. Cycle detection
// WHITE=unvisited, GRAY=on current path, BLACK=done
func detectCycle(nodes []string, edges map[string][]string) bool {
color := map[string]int{}
var dfs func(n string) bool
dfs = func(n string) bool {
color[n] = 1
for _, nb := range edges[n] {
if color[nb] == 1 { return true }
if color[nb] == 0 && dfs(nb) { return true }
}
color[n] = 2
return false
}
for _, n := range nodes {
if color[n] == 0 && dfs(n) { return true }
}
return false
}B. DST test cases
// Spring forward 2026: March 8, 2 AM → 3 AM in America/New_York
// Schedule "30 2 * * *" → next fire at March 9, 02:30 (March 8 2:30 doesn't exist)
// Fall back 2026: November 1, 2 AM → 1 AM
// Schedule "30 2 * * *" → first occurrence of 2:30 (the EDT one) is usedC. Kahn's algorithm (level-ordered topo sort)
func topoLevels(nodes []string, edges map[string][]string) [][]string {
in := map[string]int{}
for _, n := range nodes { in[n] = 0 }
for _, tos := range edges { for _, t := range tos { in[t]++ } }
var levels [][]string
cur := []string{}
for _, n := range nodes { if in[n] == 0 { cur = append(cur, n) } }
for len(cur) > 0 {
levels = append(levels, cur)
var next []string
for _, n := range cur {
for _, d := range edges[n] {
in[d]--
if in[d] == 0 { next = append(next, d) }
}
}
cur = next
}
return levels
}D. Atomic lock + fence Lua
-- KEYS[1] = lock:job:{id} KEYS[2] = fence:job:{id}
-- ARGV[1] = worker_id ARGV[2] = ttl
if redis.call('SET', KEYS[1], ARGV[1], 'NX', 'EX', ARGV[2]) then
return redis.call('INCR', KEYS[2])
else
return nil
endExplore the Technologies
| Technology | Role | Learn more |
|---|---|---|
| Postgres 17 | Source of truth | PostgreSQL |
| Valkey 8 | Schedule index, locks, heartbeats | Redis/Valkey |
| Kafka 4.0 | Priority dispatch bus | Kafka |
| etcd 3.6 | Leader lease (upgrade path) | etcd |
| KEDA + Karpenter | Worker autoscaling | Kubernetes architecture |
| ClickHouse 24 | Job analytics | ClickHouse |
Patterns: Message Queues & Event Streaming, Circuit Breakers & Resilience, Auto-scaling, Replication & Consistency, Rate Limiting.
Further reading. Fencing Tokens by Kleppmann · Kahn's algorithm · etcd concurrency · Temporal workflows · Oban (Postgres-backed queue) · River (Go).
Practice this design: Job Scheduler interview question.