Building a Fail-Safe, Scalable Top-K Streaming System
Goal: Compute the most-watched videos across multiple time windows (10 min, daily, monthly, lifetime), serve results in real time, and survive component failures with zero data loss. Scale: 7 billion events per day.
Here's the full architecture, from ingestion to serving, with failure scenarios and the math behind every decision.
1. Problem Statement
A video platform (YouTube / TikTok scale) needs to answer: "What's trending right now?"
Detailed functional requirements are in Section 2. But first, what NOT to build:
What NOT to do:
- Insert one DB row per watch event (7B rows/day = instant death)
- Compute Top-K with
SELECT ... ORDER BY count DESCover raw events - Use Redis as source of truth (it's volatile memory)
- Count in Redis with ZINCRBY (no replay, no correctness after restart)
- Build a monolith that does ingestion + computation + serving
2. Functional Requirements
| ID | Requirement | Priority |
|---|---|---|
| FR-1 | Top-K videos in the last 10 minutes (sliding window, what's hot right now) | P0 |
| FR-2 | Top-K videos today (daily tumbling window, daily trending) | P0 |
| FR-3 | Top-K videos this month (monthly tumbling window, monthly leaderboard) | P0 |
| FR-4 | Lifetime top videos (hall of fame, running count with no window) | P1 |
| FR-5 | Near-real-time freshness -- results update within seconds, not minutes | P0 |
| FR-6 | No data loss, even during failures of any component | P0 |
| FR-7 | Multi-tenant support -- per-region breakdowns (e.g., Top-K in India, US, EU) | P1 |
| FR-8 | Multi-tenant support -- per-category breakdowns (e.g., Top-K in music, gaming) | P1 |
| FR-9 | Combined dimensional queries (per-region AND per-category Top-K) | P2 |
| FR-10 | Write API for event ingestion (accept video watch events from clients) | P0 |
| FR-11 | Read API for Top-K queries (serve Top-K results with metadata) | P0 |
| FR-12 | Batched event ingestion for SDKs (up to 500 events per batch) | P1 |
| FR-13 | Freshness tracking in API responses (how old the data is) | P1 |
3. Non-Functional Requirements
| Requirement | Target |
|---|---|
| Throughput (average) | 81,000 events/sec (7B daily watches / 86,400s) |
| Throughput (peak, 10x) | ~810,000 events/sec |
| Throughput (viral spike) | ~2,000,000 events/sec (must plan for this) |
| API read latency (p99) | < 10ms |
| Data freshness | < 30 seconds (Top-K results reflect recent activity) |
| Availability | 99.99% (degrade gracefully, never hard-fail) |
| Durability | Zero data loss -- no event dropped even during component failures |
| Horizontal scalability | Scale linearly with traffic; handle 25x spikes without rearchitecture |
| Processing semantics | Exactly-once end-to-end -- every event counted exactly once |
4. High-Level Approach & Technology Selection
4.1 Why Streaming, Not Batch?
7 billion events per day at 81K events/sec average. Users expect "what's trending right now" to update in seconds, not hours. A batch pipeline running every hour shows stale results -- useless for a "hot right now" feature. Running batch every minute at this volume collapses under scheduling overhead and startup latency.
This is a streaming problem. Events never stop arriving, and users expect the trending list to reflect what happened seconds ago, not hours ago.
4.2 What Components Do We Need?
The system does four things: ingest events, count them by time window, pick the top K, and serve the results. Each step has different scaling requirements:
- Durable event log -- Decouple ingestion from processing. If the processing layer falls behind or crashes, events must be buffered and replayable. This is the sole source of truth.
- Stateful stream processor -- Count views per video across multiple time windows (10-min, daily, monthly, lifetime). Maintain state for 50M+ unique videos. Support exactly-once processing to avoid double-counting.
- Fast serving layer -- The read API needs sub-10ms latency. Pre-computed Top-K results served from memory.
- Durable aggregate store -- Fallback when the serving cache is unavailable. Audit trail for historical queries.
4.3 Store Selection
| Store | Role | Why This Choice |
|---|---|---|
| Apache Kafka | Immutable event log, sole source of truth | Durable (Replication Factor RF=3, acks=all), replayable, decouples ingestion from processing. All raw events live here and nowhere else. Everything downstream is derived state. |
| Apache Flink + RocksDB | Stateful stream processing, windowed aggregation, Top-K compute | Exactly-once semantics, event-time windowing, incremental checkpoints to S3. RocksDB handles 50M+ keys without GC pressure. |
| Redis | Hot serving layer for current Top-K results | Sub-millisecond reads via ZSET. Pure cache -- disposable, repopulated by Flink every 30 seconds. NOT source of truth. |
| PostgreSQL | Durable aggregates, API fallback, audit trail | ACID guarantees, UPSERT for idempotency, partitioned by month. Fallback when Redis is unavailable. |
4.4 Why Kafka Is the ONLY Source of Truth
Everything downstream of Kafka is DERIVED STATE. Only Kafka stores EVENTS (facts about what happened).
4.5 Why Flink, Not Spark Streaming
| Flink | Spark Structured Streaming | |
|---|---|---|
| Processing model | True record-at-a-time streaming | Micro-batches (100ms–seconds) |
| Latency | Milliseconds | Seconds (bounded by batch interval) |
| State management | First-class: RocksDB backend, incremental checkpoints, queryable state | Limited: state store is coupled to micro-batch lifecycle |
| Event-time handling | Native watermarks, allowed lateness, side outputs for late data | Supported but less flexible (watermark delay only) |
| Exactly-once | Checkpoint barriers + 2PC sinks (true streaming exactly-once) | Relies on micro-batch atomicity (weaker for external sinks) |
| Backpressure | Built-in credit-based flow control | Relies on batch scheduling (can cause cascading delays) |
For this system: We need millisecond-level processing, 50M keys in managed state with incremental checkpoints, and exactly-once 2PC writes to Postgres. Flink does all of this out of the box. Spark would add latency (micro-batch overhead on every trigger) and make state management painful at 50M keys.
What about Kafka Streams? Kafka Streams is a lightweight library (no cluster needed), great for simple transformations. But it stores state in embedded RocksDB on the application instance itself. No centralized checkpointing to S3, no cross-node state redistribution on rescale. With 50M keys across 128 partitions, rebalancing after a crash means rebuilding state from the Kafka changelog topic (slow). Flink's managed state with incremental S3 checkpoints, 2PC sinks, and independent scaling of TaskManagers makes it the right choice at this scale.
4.6 Why RocksDB for Flink State
| Without RocksDB (Heap State) | With RocksDB | |
|---|---|---|
| Memory | 50M keys x 200 bytes = 10 GB in JVM heap | Small JVM heap (hot keys LRU cache) + 30-50 GB on SSD |
| GC | Multi-second GC pauses | Minimal GC (state lives off-heap) |
| Spikes | OOM on traffic spikes | Handles spikes gracefully |
| Checkpoints | Serialize ALL state (slow, blocks job). 10 GB upload | Incremental checkpoints: only changed SST files. 2-5 GB upload |
| Verdict | DOES NOT WORK at 50M keys | Works. Scales to billions of keys |
Rule: If state is unbounded (grows with distinct keys), use RocksDB. Always.
Tuning matters: Default RocksDB config works for small state. At 30-50 GB, tune
block_cache_size(2-4 GB),write_buffer_size(128 MB), and compaction threads to match SSD throughput. Flink exposes these viaRocksDBOptionsFactory.
5. High-Level Architecture
5.1 Architecture Diagram
5.2 Responsibility Matrix
| Layer | Component | Responsibility | State | Failure Impact |
|---|---|---|---|---|
| Ingestion | API Gateway | Accept events, validate, produce to Kafka | Stateless | None (LB reroutes) |
| Event Log | Kafka | Immutable source of truth, replay | Durable (RF=3) | Brief pause during leader election |
| Processing | Flink | Windowed aggregation, Top-K compute, exactly-once | RocksDB + S3 checkpoints | Recovers from last checkpoint |
| Cache | Redis | Fast serving of current Top-K | Volatile (cache) | API falls back to Postgres |
| Store | PostgreSQL | Durable aggregates, audit, fallback | Durable (WAL + replicas) | Flink buffers, API degrades |
6. Back-of-the-Envelope Estimation
6.1 Event Scale
Daily watches: 7 billion
Average rate: 7B / 86,400 = ~81,000 events/sec
Peak rate (10x): ~810,000 events/sec
Spike (viral moment): ~2,000,000 events/sec (plan for this)
6.2 Event Size
Single watch event (JSON):
{
"event_id": "uuid", // 36 bytes
"video_id": "vid_abc123", // 20 bytes
"user_id": "usr_xyz789", // 20 bytes
"timestamp": 1707000000000, // 8 bytes
"region": "IN", // 2 bytes
"category": "music", // 10 bytes
"device": "mobile", // 10 bytes
"session_id": "sess_123" // 20 bytes
}
Serialized size (Avro): ~150 bytes
Serialized size (JSON): ~300 bytes
6.3 Kafka Bandwidth
Ingestion rate: 81K events/sec x 150 bytes = ~12 MB/sec average
Peak: 810K x 150 bytes = ~122 MB/sec
Spike: 2M x 150 bytes = ~300 MB/sec
Per broker capacity: ~100 MB/sec writes (conservative)
Brokers needed: 300 / 100 = 3 brokers minimum (spike writes only)
With RF=3: Each write generates 3x network traffic (leader + 2 followers)
Brokers handle both leader writes and follower replication
Practical: 12 brokers (handles replication traffic, rolling restarts, rack diversity across 3 AZs)
6.4 Kafka Storage
Daily raw volume: 7B x 150 bytes = ~1 TB/day (uncompressed)
With LZ4: ~400 GB/day
With RF=3: ~1.2 TB/day on cluster
Retention 7 days: 1.2 TB x 7 = ~8.4 TB total Kafka storage
Retention 30 days: ~36 TB (if keeping for reprocessing)
6.5 Flink State Size
Unique videos (active in any window): ~50 million
Per-video state (counters for 4 windows): ~200 bytes per video
Total keyed state: 50M x 200 = ~10 GB
Top-K heaps (K=100, 4 windows): 4 x 100 x 50 bytes = ~20 KB
(negligible)
RocksDB on-disk (with compaction): ~30-50 GB
Checkpoint size (incremental): ~2-5 GB per checkpoint
6.6 Redis Memory
Top-K ZSET entries:
4 windows x 100 entries x ~80 bytes per entry = ~32 KB total
(Top-K result sets are tiny)
If we also cache per-video counts for the API:
50M videos x (key + score) = 50M x 50 bytes = ~2.5 GB
Redis requirement: 4-8 GB (with overhead)
6.7 Postgres Storage
Aggregated rows (not raw events):
Per 10-min window: 50M videos x 144 windows/day = way too many
Better: Store only Top-1000 per window emission
Emissions per day: 24 x 60 x 2 = 2,880 (every 30s)
Rows per emission: ~1,000 (top videos)
Daily rows: 2,880 x 1,000 = ~2.88M rows x ~100 bytes = ~288 MB/day
Monthly: ~8.6 GB
For daily/monthly/lifetime aggregates:
Much smaller: ~50M rows for lifetime, partitioned by month
6.8 Summary
| Resource | Estimate |
|---|---|
| Kafka brokers | 12 (RF=3, 7-day retention) |
| Kafka storage | ~8.4 TB |
| Flink TaskManagers | 20-40 (depends on parallelism) |
| Flink state (RocksDB) | 30-50 GB |
| Redis memory | 4-8 GB |
| Postgres storage | ~100 GB/year (aggregates only) |
| Network bandwidth (peak) | ~300 MB/sec ingest |
This is a medium-scale system. The hard part isn't throughput. It's correctness.
6.9 Cost Estimate (AWS, On-Demand Pricing)
People design systems but never price them. Here's what this actually costs:
| Component | Spec | Monthly Cost |
|---|---|---|
| Kafka brokers | 12x i3.xlarge (4 vCPU, 30.5 GB, 950 GB NVMe) | ~$3,600 |
| Flink TaskManagers | 32x m5.2xlarge (8 vCPU, 32 GB) | ~$7,400 |
| Flink JobManager (HA) | 2x m5.xlarge | ~$280 |
| Redis Cluster | 3x cache.r6g.xlarge (26 GB) | ~$1,100 |
| PostgreSQL | 1x db.r6g.2xlarge primary + 2 read replicas | ~$2,400 |
| S3 (checkpoints + tiered Kafka) | ~20 TB stored, moderate request volume | ~$500 |
| Networking (cross-AZ) | Kafka replication + Flink-to-sinks | ~$800 |
| Total | ~$16,000/mo |
With reserved instances (1-year commit), this drops to roughly $10,000-11,000/mo. With spot instances for Flink TaskManagers (checkpoints make this safe since state survives preemption), closer to $8,000-9,000/mo.
At ~$16K/mo for 7 billion events per day, that's reasonable. The Redis cluster alone costs less than a senior engineer's daily rate. The entire pipeline is cheaper than one additional headcount.
7. Data Model
7.1 Kafka Avro Schema
{
"type": "record",
"name": "VideoWatchEvent",
"namespace": "com.platform.events",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "video_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "event_timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "server_timestamp", "type": "long", "logicalType": "timestamp-millis"},
{"name": "region", "type": "string"},
{"name": "category", "type": "string"},
{"name": "device", "type": "string"},
{"name": "watch_duration_sec", "type": "int"},
{"name": "session_id", "type": "string"}
]
}Two timestamps:
event_timestamp: When the user actually watched (client clock -- can be skewed)server_timestamp: When the API server received it (reliable, used for watermarks)
7.2 Kafka Topic Design
Topic: video.watches
Partitions: 128
Replication: RF=3, min.insync.replicas=2
Acks: all (no data loss)
Retention: 7 days (hot), 30 days (tiered to S3)
Compression: LZ4 (best speed/ratio for this workload)
Serialization: Avro with Schema Registry
Partition key: video_id
Why partition by video_id:
All watches for the same video land on the same partition. This guarantees ordering per video and enables Flink to keyBy(video_id) without repartitioning (co-partitioning).
Why Avro with Schema Registry: Avro is ~50% smaller than JSON (150 bytes vs 300 bytes). At 81K events/sec, that's 12 MB/sec vs 24 MB/sec into Kafka. Schema Registry enforces backward/forward compatibility so producers and Flink consumers can evolve the schema independently without breaking the pipeline. Why not Protobuf? Both work. Avro has tighter Kafka ecosystem integration (Confluent Schema Registry, native Flink support) and schema-on-read flexibility. Protobuf would work equally well if the team already uses it.
Schema evolution in practice:
What happens when we need to change the event schema? Say we want to add a content_type field or remove session_id.
| Change | Compatible? | What Happens |
|---|---|---|
| Add field with default value | Yes (backward compatible) | Old consumers ignore it. New consumers read the default for old events |
| Remove field that had a default | Yes (forward compatible) | Old consumers use the default. New consumers skip it |
| Rename a field | No | Avro matches by field name. Renamed = new field + removed field |
| Change field type (int to long) | No | Binary incompatible. Schema Registry rejects the registration |
For non-compatible changes, we have two options: create a new topic (video.watches.v2) and run a migration job that reads from v1, transforms, and writes to v2. Or use Schema Registry's FULL_TRANSITIVE compatibility mode from the start, which catches breaking changes before they reach production. The Flink job needs a savepoint-based redeployment to pick up any new deserializer, even for backward-compatible changes.
Why 128 partitions: Kafka partitions can't be reduced and adding them triggers expensive data rebalancing, so we pre-provision with headroom. We start with 32 Flink task slots and want room to scale 4x without repartitioning → 128. The per-partition math checks out at every traffic tier:
| Traffic | Events/sec | Per partition (÷128) | Comfortable? |
|---|---|---|---|
| Average | 81K | ~630/sec | Trivial |
| Peak (10x) | 810K | ~6,300/sec | Yes |
| Viral spike (25x) | 2M | ~15,600/sec | Yes (single thread handles ~50K/sec) |
7.3 Kafka Producer Configuration (API Gateway)
acks=all # wait for all replicas
retries=2147483647 # infinite retries (with backoff)
max.in.flight.requests=5 # allow pipelining with idempotence
enable.idempotence=true # exactly-once producer
compression.type=lz4
batch.size=65536 # 64 KB batches
linger.ms=5 # wait up to 5ms to fill batch
buffer.memory=67108864 # 64 MB bufferenable.idempotence=true prevents duplicate writes to Kafka even if the producer retries. Combined with acks=all, this gives us exactly-once at the Kafka layer.
7.4 Redis Data Structures
Sorted Sets (ZSET) -- one per window type:
Key pattern: video:topk:{window}:current
Example keys: video:topk:10min:current
video:topk:daily:current
video:topk:monthly:current
video:topk:lifetime:current
7.5 Redis Write Pattern (From Flink)
Flink emits a Top-K result and writes to Redis as a single pipeline:
# Atomic update using MULTI/EXEC (transaction)
MULTI
DEL video:topk:10min:current
ZADD video:topk:10min:current 1523 vid_abc
ZADD video:topk:10min:current 1401 vid_def
ZADD video:topk:10min:current 890 vid_ghi
... (100 entries)
EXPIRE video:topk:10min:current 900 # 15-min TTL (generous)
EXEC7.6 Why ZADD (Overwrite), Never ZINCRBY (Increment)
Here's what matters: Flink computes the total view count internally (in RocksDB state) and writes the absolute value to Redis. The 1523 in ZADD video:topk:10min:current 1523 vid_abc means "vid_abc has exactly 1,523 views in this window." It's a complete snapshot of Flink's state, not a delta. Redis never counts anything. It just stores whatever Flink last told it.
This is why ZADD works and ZINCRBY doesn't:
ZINCRBY approach (BAD):
ZINCRBY video:topk:10min:current 1 vid_abc
This tells Redis: "add 1 to vid_abc's score"
Now Redis is the counter. Problems:
- Flink retry → incremented AGAIN (double-count)
- Redis restart → counts start from 0 (wrong)
- Flink checkpoint restore → old events replayed → counts too high
- Redis becomes source of truth → violates our architecture
ZADD approach (GOOD):
ZADD video:topk:10min:current 1523 vid_abc
This tells Redis: "vid_abc's score IS 1523"
Flink is the counter. Redis is a display board. Benefits:
- Idempotent: writing 1523 twice = still 1523
- Flink retry: same absolute count written → no corruption
- Redis restart: starts empty, Flink re-emits in ≤30s → repopulated
- Redis is a pure cache, disposable, not source of truth
7.6.1 Why Redis Persistence Is Disabled
Since Flink re-emits the complete Top-K snapshot every 30 seconds, Redis persistence (both RDB and AOF) is disabled:
save "" # Disable RDB snapshots
appendonly no # Disable AOF log
Why disable RDB: RDB snapshots fork the Redis process, momentarily doubling memory usage. For a cache that's fully repopulated every 30 seconds, this I/O and memory spike provides no value.
Why disable AOF: AOF logs every write to disk for durability. But durability is pointless when the data is derived and rebuilt every 30 seconds from Flink's state (which itself is checkpointed to S3).
Recovery behavior on restart is covered in Section 11.1 (Redis Failure).
7.7 Redis Read Pattern (From API)
# Get top 100 videos for 10-minute window, highest first
ZREVRANGE video:topk:10min:current 0 99 WITHSCORES
# Returns:
# 1) "vid_abc" 2) "1523"
# 3) "vid_def" 4) "1401"
# 5) "vid_ghi" 6) "890"
# ...Time complexity: O(log N) per lookup where N = size of the sorted set. With 100 entries, this is effectively instant.
7.8 Optional: Per-Video Count Cache
For APIs that need "how many views does video X have right now?":
# Written by Flink alongside Top-K:
SET video:views:10min:vid_abc 1523 EX 900
SET video:views:daily:vid_abc 45210 EX 864007.9 Redis Topology
Redis Cluster: 3 masters + 3 replicas
(way more than we need for 4-8 GB, but HA requires it)
Read path: API -> Redis replica (read replicas for scaling reads)
Write path: Flink -> Redis master
If a master fails: automatic failover to replica in < 15 seconds
7.10 PostgreSQL Schema
-- Top-K results per window emission
CREATE TABLE topk_results (
window_type TEXT NOT NULL, -- '10min', 'daily', 'monthly', 'lifetime'
window_start TIMESTAMPTZ NOT NULL, -- left edge of the window
window_end TIMESTAMPTZ NOT NULL, -- right edge of the window
emitted_at TIMESTAMPTZ NOT NULL, -- when Flink emitted this snapshot
video_id TEXT NOT NULL,
rank SMALLINT NOT NULL,
view_count BIGINT NOT NULL,
PRIMARY KEY (window_type, window_end, emitted_at, video_id)
) PARTITION BY RANGE (emitted_at);
-- Monthly partitions for automatic management
CREATE TABLE topk_results_2026_02 PARTITION OF topk_results
FOR VALUES FROM ('2026-02-01') TO ('2026-03-01');
-- Index for API fallback queries
CREATE INDEX idx_topk_latest ON topk_results (window_type, emitted_at DESC);
-- Per-video aggregated views (for individual video lookups)
CREATE TABLE video_view_aggregates (
video_id TEXT NOT NULL,
window_type TEXT NOT NULL,
window_start TIMESTAMPTZ NOT NULL,
view_count BIGINT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (video_id, window_type, window_start)
) PARTITION BY RANGE (window_start);
-- Lightweight materialized view for latest Top-K (API fallback)
CREATE MATERIALIZED VIEW topk_latest AS
SELECT DISTINCT ON (window_type, video_id)
window_type, window_start, window_end, video_id, rank, view_count, emitted_at
FROM topk_results
WHERE emitted_at > NOW() - INTERVAL '5 minutes'
ORDER BY window_type, video_id, emitted_at DESC;How to read a row: Each row says "at emitted_at, Flink computed the Top-K for the window [window_start, window_end], and video_id was ranked #rank with view_count views."
Example data, what the table actually looks like:
window_type | window_start | window_end | emitted_at | video_id | rank | view_count
------------|--------------------------|--------------------------|--------------------------|----------|------|----------
10min | 2026-02-04T11:50:30Z | 2026-02-04T12:00:30Z | 2026-02-04T12:00:30Z | vid_abc | 1 | 1523
10min | 2026-02-04T11:50:30Z | 2026-02-04T12:00:30Z | 2026-02-04T12:00:30Z | vid_def | 2 | 1401
10min | 2026-02-04T11:51:00Z | 2026-02-04T12:01:00Z | 2026-02-04T12:01:00Z | vid_abc | 1 | 1540
daily | 2026-02-04T00:00:00Z | 2026-02-05T00:00:00Z | 2026-02-04T12:00:30Z | vid_abc | 1 | 45210
Notice:
- 10min rows:
window_end - window_start = 10 minutesalways. The window slides every 30 seconds, so consecutive emissions have windows that overlap by 9min 30s. - daily rows:
window_startis midnight,window_endis next midnight. Multiple emissions share the same window boundaries.emitted_atdistinguishes them (latest = freshest). emitted_at ≈ window_endfor sliding windows (Flink emits when the window fires). For daily/monthly tumbling windows,emitted_atadvances every 30s whilewindow_start/window_endstay fixed for the day.
7.11 PostgreSQL Write Pattern (From Flink)
-- UPSERT: idempotent, safe for retries
INSERT INTO topk_results (window_type, window_start, window_end, emitted_at, video_id, rank, view_count)
VALUES ('10min', '2026-02-04T11:50:30Z', '2026-02-04T12:00:30Z', '2026-02-04T12:00:30Z', 'vid_abc', 1, 1523)
ON CONFLICT (window_type, window_end, emitted_at, video_id)
DO UPDATE SET rank = EXCLUDED.rank, view_count = EXCLUDED.view_count;Batch writes: Flink batches 100 rows per window emission into a single JDBC batch. At 4 windows x 100 videos x 1 emission/30s = ~800 UPSERTs/minute. Trivial load for Postgres.
7.12 PostgreSQL Read Pattern (API Fallback)
-- Get latest Top-K for a window (when Redis is down)
SELECT video_id, rank, view_count, window_start, window_end
FROM topk_results
WHERE window_type = '10min'
AND emitted_at = (
SELECT MAX(emitted_at) FROM topk_results WHERE window_type = '10min'
)
ORDER BY rank ASC
LIMIT 100;Execution time: ~5-10ms with index. Acceptable for fallback.
7.13 PostgreSQL Retention & Cleanup
-- Cron: keep only 7 days of fine-grained Top-K snapshots
DELETE FROM topk_results WHERE emitted_at < NOW() - INTERVAL '7 days';
-- Keep daily summaries forever (archive to cold table)
INSERT INTO topk_results_archive
SELECT * FROM topk_results WHERE emitted_at < NOW() - INTERVAL '7 days';8. API Design
8.1 Write API (Event Ingestion)
POST /v1/events/video-watch
Content-Type: application/json
X-Idempotency-Key: evt_550e8400-e29b-41d4
{
"video_id": "vid_abc123",
"user_id": "usr_xyz789",
"timestamp": 1707000000000,
"region": "IN",
"category": "music",
"device": "mobile",
"watch_duration_sec": 45,
"session_id": "sess_123"
}
Response (202 Accepted):
{
"status": "ACCEPTED",
"event_id": "evt_550e8400-e29b-41d4"
}
Why 202, not 201: The event is accepted into Kafka but not yet processed. The client doesn't need to wait for Flink to compute Top-K.
Batched ingestion for SDKs:
POST /v1/events/video-watch/batch
{
"events": [ {...}, {...}, ... ] // up to 500 per batch
}
8.2 Read API (Top-K Queries)
GET /v1/top-videos?window=10min&k=100®ion=IN&category=music
Response (200 OK):
{
"window": "10min",
"window_start": "2026-02-04T12:00:00Z",
"window_end": "2026-02-04T12:10:00Z",
"freshness_ms": 1200, // how old this data is
"source": "redis", // or "postgres" if fallback
"videos": [
{"rank": 1, "video_id": "vid_abc", "views": 1523, "delta": "+312"},
{"rank": 2, "video_id": "vid_def", "views": 1401, "delta": "+108"},
...
]
}
GET /v1/top-videos?window=today&k=50
GET /v1/top-videos?window=month&k=100
GET /v1/top-videos?window=lifetime&k=100
8.3 Internal API (Flink to Redis + Postgres)
This is not user-facing. Flink writes directly:
Flink -> Redis: ZADD commands (bulk pipeline)
Flink -> Postgres: JDBC batch UPSERT
No REST API between Flink and storage. Direct client connections. No serialization overhead, no extra hop.
9. Deep Dives
9.1 Flink Job Topology & Window Design
Job Topology
Checkpoints: every 60s, incremental, to S3 | State backend: RocksDB | Parallelism: 128
Scaling parallelism: We start at 32 and can scale up to 128 without repartitioning Kafka (128 partitions provisioned as headroom). Scaling requires a job restart from a savepoint, not automatic, but downtime is only 30-60 seconds since Flink restores state from the S3 checkpoint and repositions Kafka offsets.
Base Stream
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpointing
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(120_000);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // incremental
// Kafka source with exactly-once
KafkaSource<WatchEvent> source = KafkaSource.<WatchEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("video.watches")
.setGroupId("flink-topk-job")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setDeserializer(new WatchEventAvroDeserializer())
.build();
DataStream<WatchEvent> base = env
.fromSource(source, WatermarkStrategy
.<WatchEvent>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((event, ts) -> event.getServerTimestamp())
.withIdleness(Duration.ofMinutes(1)),
"kafka-source")
.uid("kafka-source")
.name("Kafka Watch Events");Window Definitions
// Dedup + keyBy
DataStream<WatchEvent> keyed = base
.keyBy(e -> e.getEventId())
.process(new DeduplicationFunction()) // TTL-based dedup
.uid("dedup")
.keyBy(e -> e.getVideoId());
// 10-minute sliding window (emits every 30 seconds)
DataStream<VideoCount> tenMinCounts = keyed
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(30)))
.aggregate(new CountAggregate(), new VideoCountWindowFunction("10min"))
.uid("window-10min");
// Daily tumbling window with continuous trigger
DataStream<VideoCount> dailyCounts = keyed
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
.aggregate(new CountAggregate(), new VideoCountWindowFunction("daily"))
.uid("window-daily");
// Monthly tumbling window with continuous trigger
DataStream<VideoCount> monthlyCounts = keyed
.window(TumblingEventTimeWindows.of(Time.days(30))) // Simplified: production uses a custom assigner aligned to calendar months
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(60)))
.aggregate(new CountAggregate(), new VideoCountWindowFunction("monthly"))
.uid("window-monthly");
// Lifetime (no window -- just keyed state)
DataStream<VideoCount> lifetimeCounts = keyed
.process(new LifetimeCountFunction())
.uid("lifetime-count");9.2 Top-K Algorithm Deep Dive
The Problem
We have 50M unique videos. We need the top 100 by view count within each window. We can't sort 50M entries every 30 seconds.
Approach: Min-Heap of Size K
For each window, maintain a min-heap of size K (e.g., 100). Root = smallest count in the heap.
Algorithm for every (video_id, count) update:
if heap.size < K:
heap.add(video_id, count)
else if count > heap.peekMin():
heap.removeMin()
heap.add(video_id, count)
else:
discard (not in top-K)
Heap vs. Redis: The min-heap runs inside Flink to select the top K from 50M videos. Once selected, Flink publishes those K results to Redis via
ZADD. Redis sorted set is just a display board. It never runs the heap algorithm, it stores whatever Flink last computed.
| Metric | Min-Heap | Sorting all videos |
|---|---|---|
| Time per update | O(log K) = O(log 100) = O(7) | O(N log N) = O(50M x 26) per trigger |
| Space | O(K) = 100 entries = ~5 KB | O(N) = 50M entries |
Exact Counts, Not Approximate
For our use case, we're already maintaining per-video counts in Flink's keyed state (each video has its own counter in the window). The Top-K heap is a secondary structure that just picks the top K from those exact counts.
Per-video state (in RocksDB):
vid_abc -> 1523 views (10min window)
vid_def -> 1401 views (10min window)
vid_ghi -> 890 views (10min window)
... (50M entries)
Top-K Heap (in memory, rebuilt on trigger):
[vid_abc: 1523, vid_def: 1401, ..., vid_xyz: 200]
(only 100 entries -- trivially fits in memory)
When to Use Approximate Algorithms
If the key space were truly massive (e.g., top IP addresses from billions of unique IPs) and we couldn't afford per-key state, we'd use:
- Count-Min Sketch: Approximate frequency estimation. O(1) per update, O(width x depth) space. Over-counts, never under-counts.
- Space-Saving Algorithm: Maintains approximate top-K with bounded error. Replaces least frequent element when new element arrives.
- HeavyKeeper: Combines min-heap with Count-Min Sketch for better accuracy on top-K specifically.
Note: Flink doesn't ship these as built-in operators. They're implemented as custom stateful functions using
KeyedProcessFunction+ RocksDB-backedMapState/ValueState, the same way the min-heap implementation below is written.
For our video use case: Exact counts with min-heap is feasible and preferred (50M keys x 200 bytes = 10 GB in RocksDB -- no problem). Use approximate algorithms only if distinct key space exceeds hundreds of millions and we can't afford the state.
Flink Implementation
// One instance per window type (each window stream feeds its own TopKAggregator)
public class TopKAggregator extends KeyedProcessFunction<String, VideoCount, TopKResult> {
private transient MapState<String, Long> videoCounts;
private transient ValueState<Long> lastEmitTime;
@Override
public void open(Configuration params) {
MapStateDescriptor<String, Long> desc = new MapStateDescriptor<>(
"video-counts", String.class, Long.class);
videoCounts = getRuntimeContext().getMapState(desc);
ValueStateDescriptor<Long> timeDesc = new ValueStateDescriptor<>(
"last-emit", Long.class);
lastEmitTime = getRuntimeContext().getState(timeDesc);
}
@Override
public void processElement(VideoCount value, Context ctx, Collector<TopKResult> out) {
videoCounts.put(value.getVideoId(), value.getCount());
// Emit every 30 seconds
Long lastEmit = lastEmitTime.value();
if (lastEmit == null || ctx.timestamp() - lastEmit >= 30_000) {
emitTopK(out, value.getWindow(), ctx.timestamp());
lastEmitTime.update(ctx.timestamp());
}
}
private void emitTopK(Collector<TopKResult> out, String window, long ts) {
PriorityQueue<VideoCount> heap = new PriorityQueue<>(
Comparator.comparingLong(VideoCount::getCount));
for (Map.Entry<String, Long> entry : videoCounts.entries()) {
heap.offer(new VideoCount(entry.getKey(), entry.getValue(), window));
if (heap.size() > K) {
heap.poll(); // remove smallest
}
}
// Collect results sorted descending
List<VideoCount> topK = new ArrayList<>(heap);
topK.sort(Comparator.comparingLong(VideoCount::getCount).reversed());
out.collect(new TopKResult(window, ts, topK));
}
}9.3 API Serving Strategy with 3-Tier Fallback
Flow Diagram
Freshness Tracking
Every API response includes a freshness_ms field:
freshness_ms = now() - last_emitted_at
If freshness_ms > 120000 (2 minutes):
-> something is wrong (Flink lag, Kafka lag)
-> return data anyway but set header: X-Data-Stale: true
-> alert on-call
CDN Caching
Top-K results change every 30 seconds. For public-facing endpoints:
Cache-Control: public, max-age=10, stale-while-revalidate=30
CDN caches for 10 seconds, serves stale for 30 more while refetching.
At 10M requests/min, CDN absorbs 99%+ of traffic.
Only ~1 request per 10 seconds actually hits our API.
Multi-Dimensional Top-K
For per-region, per-category breakdowns:
# Regional Top-K
video:topk:10min:region:IN:current
video:topk:10min:region:US:current
# Category Top-K
video:topk:10min:category:music:current
video:topk:daily:category:gaming:current
# Combined
video:topk:10min:region:IN:category:music:currentThese are separate Flink windows with compound keys:
// Separate pipeline branch from the base stream (not the video-keyed stream)
base.keyBy(e -> e.getRegion() + ":" + e.getVideoId())
.window(...)Trade-off: Each dimension doubles the number of windows and state. Limit to the dimensions the product actually needs.
9.4 Exactly-Once Semantics End-to-End
The Exactly-Once Chain
EXACTLY-ONCE CHAIN
| Hop | Guarantee | Mechanism |
|---|---|---|
| Producer → Kafka | Exactly-once | Idempotent producer |
| Kafka → Flink | Exactly-once | Checkpointed offsets |
| Flink → Redis | At-least-once | Idempotent ZADD is safe |
| Flink → Postgres | Exactly-once | 2PC sink |
End-to-end: Effectively exactly-once because Kafka producer deduplicates retries, Flink checkpoints bundle Kafka offsets + state, Redis writes are idempotent (ZADD = overwrite), and Postgres writes are idempotent (UPSERT).
Trade-off: Exactly-once checkpointing briefly pauses record processing during barrier alignment. At 60-second intervals with incremental state, this adds ~100-500ms of latency per checkpoint. For a 30-second emit cycle, that's negligible.
Two-Phase Commit Sink (Postgres)
For true exactly-once to Postgres, Flink uses a 2PC sink:
Phase 1 (pre-commit): Flink writes to Postgres but doesn't COMMIT
Phase 2 (commit): On checkpoint success, Flink sends COMMIT
If crash before phase 2: Postgres rolls back uncommitted txn
If crash after phase 2: Data is committed and consistent
Postgres state stays consistent with
Flink's checkpointed state.
9.5 Late Events & Watermark Strategy
The Problem
Events arrive late. A user watches a video at T=12:00:00, but due to network delays, the event reaches Kafka at T=12:00:45. If the 10-minute window [11:50:00, 12:00:00] has already closed, this event is "late."
Watermark Configuration
Watermark configuration is in the Base Stream code (Section 9.1). Here's what the settings mean:
BoundedOutOfOrderness(30s): Flink waits 30 seconds past the latest event time before closing a window. Events arriving within 30 seconds of the window end are included.
withIdleness(1 min): If a Kafka partition sends no events for 1 minute, Flink advances the watermark anyway (prevents stalling on idle partitions).
Late Event Handling
// Allow late events up to 5 minutes, send them to a side output
OutputTag<WatchEvent> lateTag = new OutputTag<>("late-events") {};
DataStream<VideoCount> tenMinCounts = keyed
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(30)))
.allowedLateness(Time.minutes(5)) // recompute window for events up to 5min late
.sideOutputLateData(lateTag) // events > 5min late go here
.aggregate(new CountAggregate(), new VideoCountWindowFunction("10min"));
// Late events beyond 5 minutes: log and count for monitoring
DataStream<WatchEvent> lateEvents = tenMinCounts.getSideOutput(lateTag);
lateEvents.addSink(new LateEventMetricsSink()); // count late events for dashboardsWhy server_timestamp, Not event_timestamp
event_timestamp (client clock):
- Client clocks can be wildly wrong (hours off)
- Android devices with bad NTP
- Emulators with epoch 0
- Time zones misconfigured
server_timestamp (API server clock):
- NTP-synced, reliable
- At most a few seconds off
- Consistent across all events
Decision: Use server_timestamp for watermarks and windowing.
Store event_timestamp for analytics but don't use it for correctness.
9.6 Deduplication
Why Dedup Is Needed
A single watch can generate duplicate events because:
- Client retries on timeout (user's device sends the same watch twice)
- Kafka producer retries (mitigated by
enable.idempotence=true, but belt-and-suspenders) - API gateway retries
Dedup Strategy in Flink
public class DeduplicationFunction extends KeyedProcessFunction<String, WatchEvent, WatchEvent> {
// Keyed by event_id
// State: just a flag "have I seen this event_id before?"
private transient ValueState<Boolean> seen;
@Override
public void open(Configuration params) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Boolean.class);
// Auto-expire after 10 minutes (events older than this won't arrive)
StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.build();
desc.enableTimeToLive(ttl);
seen = getRuntimeContext().getState(desc);
}
@Override
public void processElement(WatchEvent event, Context ctx, Collector<WatchEvent> out) {
if (seen.value() == null) {
seen.update(true);
out.collect(event); // first time -> pass through
}
// else: duplicate -> silently drop
}
}State cost: Each event_id is ~36 bytes + overhead = ~100 bytes. With 10-minute TTL at 81K events/sec: 81K x 600s x 100 bytes = ~4.8 GB in RocksDB. Manageable.
9.7 Checkpointing & State Recovery
Checkpoint Configuration
Base checkpoint config (interval, timeout, RocksDB backend) is in the Base Stream code (Section 9.1). Additional production settings:
// Keep 3 checkpoints (for manual recovery from older state)
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setExternalizedCheckpointRetention(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// Store checkpoints in S3
env.getCheckpointConfig().setCheckpointStorage("s3://flink-checkpoints/topk-job/");What Gets Checkpointed
Each checkpoint (every 60 seconds) atomically saves:
| # | Component | What's Saved |
|---|---|---|
| 1 | Kafka consumer offsets | partition-0: offset 1,234,567; partition-1: offset 2,345,678; ... (128 partitions) |
| 2 | RocksDB state (incremental) | Only SST files changed since last checkpoint. Typically 2-5 GB (vs. 30-50 GB full state) |
| 3 | Window state | All in-progress window contents (events not yet emitted) |
| 4 | Operator state | Pending Redis/Postgres writes (2PC pre-commit) |
Storage: S3 (durable, cross-AZ). Size: ~2-5 GB (incremental). Duration: 10-30 seconds.
Why S3? RocksDB state lives on local disk. If the node dies, it's gone. S3 gives cross-AZ durability so any new TaskManager can download the checkpoint and resume. At 60-second intervals, we lose at most one minute of processing on failure.
Recovery Process
Full recovery walkthrough with timeline, mermaid diagram, and "during crash" behavior in Section 11.2 (Flink Failure & Recovery).
Savepoints vs Checkpoints: Planned Upgrades
Checkpoints handle crashes. But what about deploying a new version of the Flink job? A bug fix in the counting logic, a new dimension we want to track, or a library upgrade. We can't just kill the job and restart with a fresh JAR. That throws away all windowed state: every in-progress 10-minute window, every daily counter, every monthly accumulation. Gone.
Savepoints solve this. A savepoint is a manually triggered, full snapshot of all operator state. Unlike checkpoints (which are incremental and managed automatically), savepoints capture everything and are designed to survive code changes.
Upgrade process:
1. flink savepoint <job-id> s3://flink-savepoints/topk-job/v2/
(takes 30-60s, job keeps running during this)
2. flink cancel <job-id>
(job stops, Kafka events buffer)
3. Deploy new JAR to Flink cluster
4. flink run -s s3://flink-savepoints/topk-job/v2/ new-topk-job.jar
(restores all state, repositions Kafka offsets, resumes processing)
5. Verify: check consumer lag dropping, Top-K freshness recovering
Total downtime: 30-60 seconds. Kafka buffers events during the gap. Flink catches up from the savepoint and replays buffered events. Redis serves slightly stale data during the window, then gets repopulated on the next emit cycle.
Why .uid(...) on every operator matters:
Every operator in the Flink code has a .uid("kafka-source"), .uid("window-10min"), etc. These UIDs are how Flink maps old state to new operators. If we rename or remove a UID between versions, Flink can't find the matching state in the savepoint and will either fail to restore or drop that operator's state silently. Keep UIDs stable across versions. Treat them like database column names.
State schema evolution:
We can add new fields to our state classes (Flink handles this with default values). We can remove fields (they get ignored on restore). But changing a field's type (say, int to long) requires a state migration or, if the change is too drastic, a full reprocessing job from Kafka. Plan state schema changes carefully. They're the closest thing to a database migration in streaming.
9.8 Backfill & Reprocessing
When We Need to Reprocess
- Bug fix in counting logic (discovered that a filter was wrong)
- New dimension added (need to compute per-category Top-K retroactively)
- Disaster recovery (all Flink state lost, need to rebuild from Kafka)
Strategy: Run a Parallel Flink Job
Kafka Tiered Storage
For reprocessing beyond 7-day retention:
Kafka 7-day hot retention -> S3 tiered storage (30-90 days)
Flink can read from tiered storage transparently.
Cost: S3 storage for 30 days = 400 GB/day x 30 = 12 TB x $0.023/GB = ~$276/month
(Cheap insurance for reprocessing capability)
10. Identify Bottlenecks
10.1 Multi-Dimensional Top-K State Explosion
Per-region and per-category breakdowns multiply state. 20 regions x 15 categories turns 4 window types into 4 x 20 x 15 = 1,200 compound windows. State and emit volume grow with it.
Fix: Only compute the dimensions the product actually needs. Use compound keys in Flink (keyBy(region + ":" + category)) and skip long-tail combinations. Those can come from Postgres offline.
10.2 Redis as Write Bottleneck During Checkpoint Recovery
After checkpoint recovery, Flink replays up to 60 seconds of Kafka events and re-emits Top-K snapshots. This creates a burst of Redis writes as Flink catches up through the backlog.
Fix: Flink's Redis sink uses async I/O with back-pressure. ZADD is idempotent, so duplicate writes during recovery are harmless. Redis Cluster with 3 masters distributes the load. The burst is short, typically 30 seconds while catching up.
10.3 Global Top-K Merge Across Regions
Each region computes its own local Top-K. A video at #50 in five separate regions could be the global #1 when counts are combined. But no single region sees that.
Fix: A simple merge service combines regional Top-K results. Each region emits its Top-1000 (not just Top-100) to give enough overlap. The merge service adds up counts for matching video IDs and re-ranks globally. This is a small-data problem: 5 regions x 1000 entries x 4 windows = 20,000 entries. Runs on a single node.
10.4 Hot Partition / Hot Key Skew
We partition Kafka by video_id and Flink uses keyBy(video_id). Normally this distributes load evenly across 128 partitions. But when a single video goes viral (World Cup final, a celebrity scandal, a surprise album drop), that one video_id sends ALL its events to one Kafka partition and one Flink subtask. The other 127 sit idle while that one subtask drowns.
The numbers get ugly fast. During a viral spike, a single video can generate 50K events/sec. That's one partition doing 50K/sec while the average partition sees 500/sec. The broker hosting that partition's leader gets hammered. The Flink subtask assigned to that partition falls behind, backpressure propagates upstream, and the entire pipeline slows down because of one video.
Fix 1: Two-phase aggregation (split-aggregate-merge)
Instead of keying directly by video_id, first key by a salted key: video_id + "_" + hash(event) % 8. This spreads one hot video across 8 subtasks. Each subtask counts its shard independently. A second keyBy(video_id) stage merges the 8 partial counts into the final total.
// Phase 1: spread hot keys across 8 shards
DataStream<VideoCount> sharded = base
.keyBy(e -> e.getVideoId() + "_" + (e.hashCode() & 7))
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.seconds(30)))
.aggregate(new CountAggregate())
.uid("sharded-count");
// Phase 2: merge shards back to real video_id
DataStream<VideoCount> merged = sharded
.keyBy(VideoCount::getVideoId)
.process(new MergeCountsFunction())
.uid("merge-counts");Why this doesn't break exactly-once: the salt is a deterministic hash of the event, not a random number. If Flink restarts and replays events from the last checkpoint, each event lands on the same shard it did before the crash. Kafka's idempotent producer deduplicates the re-sent records, and Flink's checkpoint-based recovery restores the per-shard counters. The merge stage simply sums deterministic partial counts, so the final total is exactly the same — no duplicates, no losses.
Downside: adds one more processing hop and a network shuffle. For non-hot keys, this is wasted overhead. We can make it adaptive by only salting keys that exceed a threshold (say, 1,000 events/sec), but that adds complexity. In practice, the overhead is small enough that salting everything is simpler.
Fix 2: Local pre-aggregation
Before the keyBy, each subtask aggregates counts locally for a short interval (1 second). Instead of forwarding 50,000 raw events per second to the keyed stage, it forwards one record: (video_id, 50000). This reduces network traffic and downstream load by orders of magnitude during spikes.
// Pre-aggregate locally before keyBy
DataStream<VideoCount> preAggregated = base
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.aggregate(new LocalCountAggregate()) // emits (video_id, count) pairs
.uid("local-pre-agg");
DataStream<VideoCount> keyed = preAggregated
.keyBy(VideoCount::getVideoId)
.window(...)This doesn't solve the Kafka partition imbalance (the events still land on one partition), but it massively reduces the work Flink has to do per event. The Kafka side is usually fine since a single partition handles 50K events/sec without breaking a sweat. It's the stateful computation that gets expensive.
11. Failure Scenarios
11.1 Redis Failure
SCENARIO: Redis goes down
Impact:
- API reads fail from Redis
- Flink Redis sink gets connection errors
What happens:
Kafka → Flink → Redis (DOWN), sink retries with backoff Flink → Postgres (OK), still written
API path: Redis read (FAILED) → timeout 5ms → fallback to Postgres (OK, ~10ms) → user sees Top-K (slightly slower)
Recovery:
- Redis comes back → Flink's next emit (30s) repopulates it
- Redis comes back EMPTY → still fine, Flink re-emits
| Metric | Value |
|---|---|
| Data loss | ZERO |
| User impact | Slightly higher latency (10ms vs 1ms) |
| Duration | Until Redis recovers + one Flink emit cycle (30s) |
The Hidden Problem: Cache Stampede
The table above says "slightly higher latency." That undersells the danger.
When Redis goes down, every API pod simultaneously falls through to Postgres. Even with CDN absorbing 99% of reads, the remaining 1% of 10M requests/min is still ~100K requests/min slamming Postgres at once. A single Postgres instance handles maybe 5,000-10,000 concurrent connections before it starts choking. Without protection, a Redis outage cascades into a Postgres outage, and now we have nothing.
This pattern has a name: cache stampede (also called thundering herd). Many clients miss the cache at the same moment and flood the backend with identical queries.
Four mitigations, and we probably want all of them:
1. Request coalescing (singleflight)
If 50 API pods all miss Redis for the same window=10min query within the same 100ms, only ONE query should hit Postgres. The rest wait for that result.
// Pseudocode: singleflight pattern
class TopKService {
ConcurrentMap<String, CompletableFuture<TopKResult>> inflight = new ConcurrentHashMap<>();
TopKResult getTopK(String window) {
// Try Redis
TopKResult cached = redis.get("video:topk:" + window + ":current");
if (cached != null) return cached;
// Singleflight: deduplicate concurrent Postgres queries
CompletableFuture<TopKResult> future = inflight.computeIfAbsent(window, key -> {
CompletableFuture<TopKResult> f = CompletableFuture.supplyAsync(
() -> postgres.queryLatestTopK(key));
f.whenComplete((result, err) -> inflight.remove(key));
return f;
});
return future.get(100, MILLISECONDS); // all waiters share one result
}
}
Without singleflight: 50 pods x 20 threads = 1,000 identical Postgres queries. With singleflight: 1 query, 999 waiters. Massive difference.
2. In-process stale cache
Each API pod keeps an in-memory copy of the last known Top-K result. Every successful Redis or Postgres read refreshes it. When both Redis and Postgres are slow or unreachable, the pod serves this stale copy immediately with an X-Data-Stale: true header.
This is the "Tier 3: stale cache" from the 3-tier fallback. It's not an external store. It's just a local variable, maybe 50 KB (4 windows x 100 entries), refreshed every 30 seconds during normal operation. TTL of 5 minutes so we never serve truly ancient data.
3. Circuit breaker on the Postgres fallback
If Postgres latency crosses 200ms or the connection pool fills past 80%, stop sending queries. Open the circuit. Serve the in-process stale cache directly and check back every 10 seconds with a single probe query. This prevents a struggling Postgres from getting buried under queued requests until it falls over entirely.
Circuit states:
CLOSED -> Postgres queries flow normally (latency < 200ms, pool < 80%)
OPEN -> Skip Postgres entirely, serve stale cache, probe every 10s
HALF-OPEN -> Send 1 probe query. If it's fast, close the circuit
4. Connection pooling with PgBouncer
Without a connection pooler, 50 API pods with 10 connections each means 500 direct Postgres connections. Postgres forks a backend process per connection. 500 processes = a lot of memory and context switching. PgBouncer in transaction mode multiplexes those 500 application connections onto ~100 actual Postgres connections. It also queues excess requests instead of rejecting them outright.
API Pod -> PgBouncer (pool_size=100, reserve_pool_size=20)
-> PostgreSQL (max_connections=150)
Without PgBouncer: 500 direct connections, Postgres struggles
With PgBouncer: 100 actual connections, Postgres is comfortable
One more thing: keep the Postgres connection pool warm even when Redis is healthy. Run a lightweight health-check query (SELECT 1) every 30 seconds per pod. Cold connection pools add 50-100ms of setup latency at exactly the wrong moment.
11.2 Flink Failure & Recovery
SCENARIO: Flink crashes (TaskManager OOM, network partition)
Timeline:
| Time | Event |
|---|---|
| T=0 | Flink running, last checkpoint at T=-60s |
| T=0 | Crash! TaskManager dies |
| T=+5s | JobManager detects failure |
| T=+10s | New TaskManager allocated (K8s) |
| T=+20s | State restored from S3 checkpoint |
| T=+25s | Kafka offsets rewound to checkpoint |
| T=+25s | Processing resumes (replays last ~85 seconds) |
| T=+55s | Caught up to live. First new emit to Redis |
During crash (T=0 to T=+25s):
- Kafka: Still accepting events (buffering)
- Redis: Serving stale data (last emit before crash)
- Postgres: Serving stale data
- API: Returns data, but freshness_ms climbing → X-Data-Stale: true header after 2 minutes
Replayed events (T=-60s to T=0):
- Flink reprocesses → same counts recomputed
- Redis ZADD → idempotent (no double-count)
- Postgres UPSERT → idempotent (no duplicates)
| Metric | Value |
|---|---|
| Data loss | ZERO |
| Stale window | ~30-60 seconds |
11.3 Redis + Flink Both Down
SCENARIO: Both Redis and Flink down simultaneously
State during outage:
- Kafka → buffering, no consumers
- Redis (FAILED), Flink (FAILED)
- API: Redis read (FAILED) → Postgres fallback (OK) → serving last known good data, freshness_ms increasing
Recovery priority:
- Bring Flink up first (processing pipeline)
- Flink restores from checkpoint, catches up from Kafka
- Flink emits to Postgres (immediately useful)
- Bring Redis up (Flink auto-populates it)
Kafka buffering:
- At 81K events/sec x 150 bytes = 12 MB/sec
- 1 hour downtime = 43 GB buffered
- Within Kafka's disk capacity → no event loss
| Metric | Value |
|---|---|
| Data loss | ZERO (Kafka buffers everything) |
| User impact | Stale data until Flink catches up |
11.4 Kafka Broker Failure
SCENARIO: Kafka broker fails
Kafka broker-3 (FAILED):
- Affected partitions: leader was on broker-3
- Leader election: 5-15 seconds
- ISR replica promoted to leader
- No data loss (RF=3, min.insync.replicas=2)
API producer:
- Brief errors during leader election
- Retries with backoff → succeeds after election
- Buffer up to 64 MB in producer memory
Flink consumer:
- Pauses on affected partitions during election
- Resumes automatically after new leader
- No reprocessing needed (offsets preserved)
| Metric | Value |
|---|---|
| Data loss | ZERO |
| Impact | 5-15 second blip |
11.5 Postgres Failure
SCENARIO: Postgres goes down
Flink Postgres sink:
- Connection errors → sink retries with backoff
- If extended outage: Flink checkpoints include pending writes, they'll be replayed after recovery
Other components:
- Redis: Still being written (OK)
- API: Reads from Redis (OK), primary path works
Risk: If Redis ALSO fails during Postgres outage, API has no fallback → serve stale local cache or 503
Recovery:
- Postgres comes back → Flink replays buffered writes
- If Flink also restarted: restores from checkpoint, re-emits to Postgres (UPSERT = idempotent)
| Metric | Value |
|---|---|
| Data loss | ZERO |
| User impact | NONE (Redis serves reads) |
11.6 Slow Consumer (Flink Can't Keep Up)
SCENARIO: Viral event, traffic 10x normal
Kafka consumer lag growing:
- Normal: < 1000 messages lag
- Now: 500,000 messages and climbing
Symptoms:
- Top-K freshness degrading (showing 5-minute-old data)
- Consumer group offset not advancing fast enough
Response:
- Scale Flink parallelism (add TaskManagers). Requires job restart from savepoint, downtime: 30-60 seconds
- Kafka partitions must be >= Flink parallelism. This is why we have 128 partitions (headroom), can scale Flink from 32 to 128 without repartitioning
- If truly extreme: shed load. Sample events (count 1 event as 10), approximate counts acceptable during viral spikes
| Metric | Value |
|---|---|
| Data loss | ZERO (Kafka buffers, Flink catches up) |
| User impact | Stale Top-K during spike |
11.7 When Is Data ACTUALLY Lost?
When is data ACTUALLY lost?
Only if Kafka loses committed data. This requires:
- 3 brokers holding replicas of the same partition all fail simultaneously WITH disk loss
- OR: acks=all is misconfigured (we don't do this)
- OR: min.insync.replicas is set to 1 (we set it to 2)
Probability: astronomically low with proper operations.
Mitigation:
- RF=3 across different racks/AZs
- min.insync.replicas=2
- Rack-aware replica placement
- Regular disk health monitoring
- Kafka tiered storage to S3 (independent durability)
Everything else (Flink, Redis, Postgres) is DERIVED from Kafka and can be rebuilt.
Failure Scenario Summary
Same story in every scenario: Kafka has the events, everything else is derived and rebuildable, every sink is idempotent. The API degrades through Redis, then Postgres, then stale cache. It never hard-fails.
12. Deployment Strategy
12.1 Kubernetes-Based Deployment
12.2 Multi-Region (If Required)
Merge strategy details in Section 10.3 (Global Top-K Merge).
13. Observability
13.1 Key Metrics
INGESTION:
- events.ingested.rate (events/sec by API pod)
- events.rejected.rate (validation failures)
- kafka.produce.latency.p99 (should be < 10ms)
- kafka.produce.error.rate (should be ~0)
KAFKA:
- kafka.consumer.lag (per partition, in messages)
- kafka.consumer.lag.seconds (estimated time behind)
- kafka.bytes.in / bytes.out (throughput per broker)
- kafka.under.replicated.partitions (should be 0)
FLINK:
- flink.checkpoint.duration (should be < 60s)
- flink.checkpoint.size (tracks state growth)
- flink.checkpoint.failures (should be 0)
- flink.records.in.rate (events/sec consumed)
- flink.records.out.rate (emits/sec to sinks)
- flink.task.backpressure (0-100%, should be < 50%)
- flink.watermark.lag (event time vs. wall clock)
- flink.late.events.dropped (from side output)
SERVING:
- topk.freshness.ms (age of current Top-K)
- redis.read.latency.p99 (should be < 1ms)
- postgres.read.latency.p99 (should be < 20ms)
- api.fallback.to.postgres.rate (should be ~0)
- api.response.latency.p99 (should be < 10ms)
13.2 Alerts
# Critical: pages on-call
- alert: KafkaConsumerLagHigh
expr: kafka_consumer_lag_seconds > 120
for: 2m
severity: critical
message: "Flink is > 2 minutes behind Kafka. Top-K data is stale."
- alert: FlinkCheckpointFailing
expr: flink_checkpoint_failures_total > 0
for: 5m
severity: critical
message: "Flink checkpoints failing. State recovery at risk."
- alert: TopKFreshnessStale
expr: topk_freshness_ms > 120000
for: 1m
severity: critical
message: "Top-K data is > 2 minutes old."
- alert: FlinkBackpressure
expr: flink_task_backpressure > 80
for: 5m
severity: warning
message: "Flink backpressure > 80%. May need scaling."
- alert: RedisDown
expr: redis_up == 0
for: 30s
severity: critical
message: "Redis unreachable. API falling back to Postgres."
- alert: KafkaUnderReplicatedPartitions
expr: kafka_under_replicated_partitions > 0
for: 1m
severity: critical
message: "Kafka has under-replicated partitions. Durability at risk."
# Warning
- alert: LateEventsHigh
expr: rate(flink_late_events_total[5m]) > 1000
for: 10m
severity: warning
message: "High late event rate. Check client clocks or watermark config."13.3 Dashboards
Dashboard 1: Pipeline Health
- Kafka ingestion rate (line chart)
- Consumer lag per partition (heatmap)
- Flink throughput in/out (line chart)
- Checkpoint duration + size (line chart)
- Top-K freshness (gauge with red/yellow/green)
Dashboard 2: Serving Health
- API response latency (histogram)
- Redis hit rate vs. Postgres fallback rate (stacked bar)
- Top-K result visualization (live preview)
- Per-window freshness (4 gauges)
Dashboard 3: Kafka Health
- Broker disk usage (per broker)
- Under-replicated partitions (alert panel)
- ISR shrink events (event timeline)
- Topic throughput (per topic)
14. Security
API Authentication: JWT or API key for event ingestion. Rate limiting per client (to prevent a single client from flooding the pipeline). Input validation -- reject malformed events at the API layer, before they reach Kafka.
Kafka: SASL/SCRAM or mTLS for broker authentication. ACLs for topic-level access control (ingestion service can write, Flink can read, nobody else). Encryption in transit (TLS) and at rest (broker-level encryption).
Flink: Runs in isolated namespace. No external access. Checkpoints encrypted in S3 (SSE-S3 or SSE-KMS). RocksDB state is ephemeral (lost when pod dies, restored from checkpoint).
Redis: Require AUTH. No public exposure. TLS in transit. Redis ACLs for read-only access from API, write access only from Flink.
Postgres: Standard DB security: TLS, strong passwords, minimal privileges. Flink service account has INSERT/UPDATE only. API service account has SELECT only.
Data Privacy: Watch events contain user_id. For GDPR/CCPA compliance, maintain a user deletion pipeline: when a user requests deletion, publish a tombstone event to Kafka, and Flink filters out that user's events from all future computations. Historical aggregates (Top-K counts) don't need deletion since they're anonymous counts, not per-user data.
One subtlety worth calling out: once a watch event is counted into a window aggregate, we can't subtract that specific user's contribution. The aggregate is just a number (vid_abc: 1,523 views). It doesn't track which users contributed to that count. So if user X deletes their account, their 3 views on vid_abc are already baked into the 1,523. We can't make it 1,520.
This is actually fine. Aggregated counts are considered anonymous data under GDPR. We're not storing "user X watched video Y" in the aggregate. We're storing "video Y has N views." No personal data, no deletion required. But we DO need to delete the raw events in Kafka (use topic compaction with tombstone records or set retention to a reasonable window). And if we maintain any per-user lookup tables (like "videos user X watched"), those need a separate deletion pipeline with its own SLA.
Key Design Principles
The Hierarchy
| # | Principle | Role |
|---|---|---|
| 1 | KAFKA REMEMBERS EVERYTHING | The immutable event log. Only source of raw truth. |
| 2 | FLINK COMPUTES TRUTH | Stateful, windowed, exactly-once computation. All counting happens here, nowhere else. |
| 3 | REDIS SERVES NOW | Disposable cache. Can lose it any time. Repopulated in 30 seconds from Flink. |
| 4 | POSTGRES REMEMBERS HISTORY | Durable fallback. Audit trail. Recovery source. |
| 5 | APIs DEGRADE, NEVER FAIL | Redis → Postgres → stale cache → never a hard 500. |
Rules
- Never count in Redis. Redis stores pre-computed results. It never increments.
- Never trust Redis as truth. It's a cache. Treat it as disposable.
- Kafka is the black box recorder. Everything else is derived.
- Writes are always idempotent. ZADD, not ZINCRBY. UPSERT, not INSERT.
- Every sink must tolerate replays. Flink will replay after recovery.
- State belongs in RocksDB. Not in JVM heap. Not in external DBs during processing.
- Checkpoints are our lifeline. Incremental, to S3, every 60 seconds.
- Watermarks define time. Use server timestamps, handle late events explicitly.
- Monitor lag obsessively. Consumer lag is the #1 indicator of pipeline health.
- Design for the failure we haven't seen yet. Every component will fail.
Explore the Technologies
Core Technologies
| Technology | Role in This System | Learn More |
|---|---|---|
| Kafka | Immutable event log, the single source of raw truth | Kafka |
| Flink | Stateful stream processing with exactly-once guarantees | Flink |
| RocksDB | Flink's state backend for 50M+ keys, incremental checkpointing | RocksDB |
| Redis | Sorted sets for serving real-time Top-K results, sub-ms reads | Redis |
| PostgreSQL | Durable fallback, audit trail, historical query serving | PostgreSQL |
| Prometheus | Metrics collection for pipeline lag, throughput, and accuracy | Prometheus |
| Grafana | Dashboards and alerting for pipeline health and SLOs | Grafana |
| Kafka Streams | Lightweight alternative evaluated in Section 4.5 | Kafka Streams |
| Spark | Micro-batch alternative evaluated in Section 4.5 | Spark |
Infrastructure Patterns
| Pattern | Role in This System | Learn More |
|---|---|---|
| Message Queues & Event Streaming | Kafka topic design, partitioning, consumer groups | Event Streaming |
| Caching Strategies | Redis cache-aside pattern, TTL policies, cache warming | Caching Strategies |
| Kubernetes | Flink job deployment, TaskManager scaling, pod scheduling | Kubernetes Architecture |
| Auto-Scaling Patterns | Kafka consumer lag-based scaling, Flink parallelism adjustment | Auto-Scaling Patterns |
| Metrics & Monitoring | End-to-end pipeline observability, accuracy tracking | Metrics & Monitoring |
| Object Storage & Data Lake | S3 for Flink checkpoints and long-term event archival | Object Storage & Data Lake |
Practice this design: Design YouTube Top K -- interview question with progressive hints.
This is how YouTube, TikTok, and Spotify compute trending lists at scale. One immutable log, one stateful processor, idempotent sinks everywhere. Get that right, and the rest is operational discipline.