Flink
The streaming engine that treats batch as a special case, not the other way around
Use Cases
Architecture
Why It Exists
If all the data shows up in a big pile and gets processed once a day, Spark is fine. But a lot of interesting problems don't work that way. Fraud detection, live bidding, anomaly alerting: these need answers in milliseconds, not minutes.
Before Flink, the options were bad in different ways. Storm gave true streaming but weak guarantees (at-most-once or at-least-once, pick one). Spark Streaming faked it with micro-batches, which meant seconds-level latency at best. Neither was great.
Flink took a different approach. It was built from the ground up as a true streaming engine that processes records one at a time, with exactly-once state consistency through distributed snapshots. And here's the philosophical difference that matters: Flink treats batch as a special case of streaming (bounded streams), not the other way around. That inversion is why the API and the runtime both feel more natural for streaming workloads.
How It Works Internally
A Flink application compiles down to a dataflow graph of operators connected by data streams. The JobManager runs the control plane: its Dispatcher accepts job submissions, the ResourceManager negotiates containers with YARN/Kubernetes/Mesos, and the Scheduler deploys operator subtasks to TaskManagers. Each TaskManager is a JVM process with a fixed number of task slots. Each slot runs a pipeline of chained operators as a single thread.
Checkpointing is where Flink earns its keep, and it's based on the Chandy-Lamport distributed snapshot algorithm. The Checkpoint Coordinator (on the JobManager) periodically injects checkpoint barriers into the source streams. These barriers flow through the dataflow graph like regular records. When an operator receives a barrier on all its input channels, it triggers an asynchronous snapshot of its local state. With the RocksDB backend, this is an incremental snapshot. Only SST files modified since the last checkpoint get uploaded to durable storage (S3, HDFS, or GCS). What matters: checkpointing does not block record processing after barrier alignment, and incremental snapshots keep checkpoint duration proportional to the change rate, not total state size.
Each operator manages its own state. Keyed operators maintain state partitioned by key, stored either on the JVM heap (fast but memory-limited) or in an embedded RocksDB instance (slower but can handle terabytes by spilling to local SSD). State is logically divided into key groups, and each parallel subtask owns a contiguous range. During rescaling (changing parallelism), key groups get redistributed from checkpoint storage. Clean key group boundaries matter here. Changing parallelism without them means a multi-hour migration.
Watermarks drive event-time processing. Sources emit watermarks, which are monotonically increasing timestamps that assert "no events with timestamp less than W will arrive." They propagate through the operator graph, and window operators use them to decide when a window is complete and can fire. Completeness and latency pull in opposite directions here. Aggressive watermarks (close to real time) fire windows faster but risk dropping late events. Conservative watermarks (lagging behind) catch more late events but add latency. Allowed lateness and side outputs for late data are the knobs for managing this tradeoff.
Production Architecture
In production on Kubernetes, the Flink Kubernetes Operator should handle job lifecycle, savepoints, and upgrades. Typical topology: 1 JobManager (HA pair with ZooKeeper or Kubernetes leader election), N TaskManagers sized based on job parallelism. Each TaskManager gets 4-8 GB of JVM heap, 2-8 GB of managed memory (for RocksDB, sorting, caching), and 2-4 CPU cores.
Checkpoint storage needs to be durable and shared. S3 or GCS is the standard choice. Incremental RocksDB checkpoints write only delta SST files, keeping checkpoint size manageable even with multi-terabyte state. The checkpoint interval should be 1-5 minutes for large-state jobs, 10-30 seconds for small-state low-latency jobs. Checkpoint timeout should be 2-3x the expected checkpoint duration. Set it tighter, and on-call shifts turn into watching checkpoints fail for no good reason.
For high availability, the JobManager runs in HA mode with metadata (job graph, checkpoint pointers) stored in ZooKeeper or Kubernetes ConfigMaps. On JobManager failure, a standby takes over and restores from the latest completed checkpoint. TaskManager failures trigger partial recovery: only the affected subtasks restart from the checkpoint, not the entire job. This is a big deal in practice because one bad node does not kill the whole pipeline.
Savepoints are operator-triggered full snapshots used for planned operations: code upgrades, parallelism changes, cluster migrations. Trigger savepoint, stop job, deploy new version, resume from savepoint. That's the whole cycle. Zero-downtime upgrades for stateful streaming jobs, and it happens often enough to be worth memorizing.
Decision Criteria
| Criteria | Flink | Spark Structured Streaming | Kafka Streams |
|---|---|---|---|
| Processing model | True streaming (record-at-a-time) | Micro-batch (100ms+ intervals) | True streaming (record-at-a-time) |
| Latency | Single-digit ms (stateless), sub-second (windowed) | Seconds (batch interval bound) | Single-digit ms |
| State management | Managed (heap or RocksDB), checkpointed | Managed (HDFS state store) | RocksDB or in-memory, Kafka changelog |
| Max state size | Multi-TB (RocksDB + incremental checkpoints) | Limited by executor memory | Limited by local disk + changelog compaction |
| Exactly-once | Via Chandy-Lamport checkpoints | Via micro-batch idempotency | Via Kafka transactions |
| Deployment | Standalone, YARN, Kubernetes | YARN, Kubernetes, Databricks | Embedded in application (no cluster) |
| Windowing | Tumbling, sliding, session, custom | Tumbling, sliding | Tumbling, sliding, session, custom |
| Community/ecosystem | Growing, strong in Europe/Asia | Massive (Databricks ecosystem) | Tied to Kafka ecosystem |
| Operational complexity | High (checkpoint tuning, RocksDB config) | Medium (benefits from Spark expertise) | Low (library, not infrastructure) |
| Best for | Complex stateful streaming, event-time processing | Batch-primary with streaming add-on | Kafka-centric stream processing |
Capacity Planning
Parallelism: Match source operator parallelism to Kafka partition count. For CPU-bound operators, parallelism should match available task slots. A rough formula: total_task_slots = max_source_parallelism * 1.2 to leave headroom for non-source operators.
Memory per TaskManager: JVM heap (4-8 GB) + managed memory (2-8 GB for RocksDB) + network buffers (1-2 GB) + framework overhead (256 MB). For RocksDB-heavy jobs, budget 4 GB managed memory per parallel subtask with large state. Total per TaskManager: 8-16 GB.
Checkpoint sizing: Incremental checkpoint size roughly equals state_change_rate * checkpoint_interval. A job with 100 GB total state and 1% change rate per minute produces ~1 GB incremental checkpoints per minute. A full checkpoint (savepoint) equals total state. Checkpoint storage budget: total_state * 3 (keep the last 3 checkpoints for rollback).
Throughput: A well-tuned Flink job can push 1-5 million events/sec per TaskManager for stateless operations. That drops to 100K-500K events/sec for stateful operations with RocksDB lookups. The bottleneck shifts from CPU (stateless) to disk IO (stateful RocksDB reads/writes) and network (shuffle between operators).
Network: Inter-TaskManager shuffle traffic depends on key distribution and operator chaining. Budget event_rate * avg_event_size * shuffle_factor where shuffle_factor is 1.0 for keyed streams (no shuffle if already partitioned) and up to 2.0 for repartitioning operations.
RocksDB tuning: Set state.backend.rocksdb.block.cache-size to 256 MB per state, state.backend.rocksdb.writebuffer.size to 64 MB, and state.backend.rocksdb.compaction.level.max-size-level-base to 256 MB. Enable bloom filters for point lookups. Watch the RocksDB compaction metrics closely. Excessive compaction causes latency spikes during checkpointing, and this bug takes days to track down without proper monitoring.
Failure Scenarios
Scenario 1: Checkpoint Backpressure Cascade
Trigger: A downstream operator gets slow (maybe a sink writing to an overloaded database), and backpressure propagates upstream. Checkpoint barriers travel with the data flow, so they get delayed too. The checkpoint timeout expires before all operators finish their snapshots.
Impact: The checkpoint fails. Flink retries on the next interval, but if backpressure persists, checkpoints keep failing. Without a completed checkpoint, the job's recovery point drifts further into the past. If the job crashes during this window, it restores from an old checkpoint and must reprocess potentially hours of data. This is the "checkpoint gap." During reprocessing, the job falls further behind, creating a vicious cycle. It gets worse: end-to-end exactly-once guarantees weaken because the Kafka producer's transaction timeout may expire, causing downstream consumers to see duplicates.
Detection: Monitor lastCheckpointDuration and numberOfFailedCheckpoints in the Flink Web UI or the metrics stack. Alert when checkpointAlignmentTime exceeds 50% of the checkpoint interval. Track backPressureTimeMsPerSecond per operator to find the bottleneck.
Recovery: Fix the root cause. Scale the slow operator, increase sink parallelism, or add buffering (e.g., async I/O for database writes). Increase checkpoint timeout to 2x the observed checkpoint duration under load. Enable unaligned checkpoints (execution.checkpointing.unaligned=true), which let barriers overtake buffered records. This decouples checkpoint completion from backpressure at the cost of larger checkpoint sizes. For immediate relief, take a savepoint, stop the job, fix the bottleneck, and resume.
Scenario 2: RocksDB State Corruption from Local Disk Failure
Trigger: A TaskManager's local SSD develops bad sectors or filesystem corruption. RocksDB SST files become unreadable. The operator throws an IOException on state access.
Impact: The affected subtask fails. Flink's restart strategy (fixed-delay or failure-rate) kicks in, but the subtask cannot reinitialize because the local RocksDB directory is corrupted. After exhausting restart attempts, the entire job fails. Recovery means restarting from the last completed checkpoint, downloading the full RocksDB state for the affected subtask from S3/HDFS. For a subtask with 50 GB of state, that can take 10-15 minutes on a 1 Gbps link. During recovery, the pipeline is down, and data piles up in Kafka.
Detection: Monitor local disk health (SMART attributes, filesystem errors in dmesg). Track Flink's numberOfFailedCheckpoints and taskFailureCount. Alert on restartingTime exceeding 5 minutes.
Recovery: Flink automatically recovers from the last checkpoint on a healthy TaskManager. The Kubernetes scheduler (or YARN) should not reschedule the subtask to the same node with the bad disk. Use pod anti-affinity or node tainting. To reduce recovery time, enable incremental checkpoints (only changed SST files need downloading) and use fast network-attached storage for checkpoint restoration. For multi-terabyte state, consider task-local recovery (state.backend.local-recovery=true), which keeps a secondary local copy. If the TaskManager gets rescheduled to the same node, it can recover from local state instead of pulling everything from remote storage. Huge win for large state.
Scenario 3: Watermark Stall from Idle Kafka Partition
Trigger: Say a Kafka topic has 64 partitions, but one partition stops receiving events for an extended period. Maybe the key distribution is skewed, or the producer for that partition is down. The watermark for that partition stays at its last value, preventing the global watermark from moving forward.
Impact: Every time-based window stalls because the global watermark is the minimum across all input partitions. No windows fire. State accumulates indefinitely as events pile up in unfired windows, and memory usage grows. Downstream systems see no output. Output-rate alerts fire, but the root cause (one idle partition) is not obvious. Eventually, the TaskManager runs out of managed memory and crashes.
This one is particularly nasty because the symptoms look like a performance problem, not a correctness problem.
Detection: Monitor per-partition watermark advancement via Flink metrics (currentInputWatermark per subtask). Alert when any subtask's watermark lags more than 5 minutes behind wall clock time. Track managedMemoryUsed for unexplained growth.
Recovery: Configure idle source detection: table.exec.source.idle-timeout=60s (Flink SQL) or WatermarkStrategy.withIdleness(Duration.ofSeconds(60)) (DataStream API). This marks partitions as idle after the timeout, excluding them from watermark computation. For Kafka sources, consider a tombstone producer that sends periodic heartbeat records to all partitions. Long term, redesign the partitioning scheme to avoid structural partition idleness.
Pros
- • True event-time processing with watermarks
- • Exactly-once state consistency
- • Low-latency stream processing
- • Sophisticated windowing (tumbling, sliding, session)
- • Unified batch and stream processing
Cons
- • Steep learning curve
- • Complex cluster management
- • Checkpointing can impact latency under load
- • Smaller community than Spark
- • Resource-intensive for stateful operations
When to use
- • Need true real-time processing (low latency)
- • Complex event patterns with event-time semantics
- • Exactly-once processing guarantees required
- • Stateful stream processing (aggregations, joins)
When NOT to use
- • Simple batch processing jobs
- • Small data volumes that don't justify the complexity
- • Team lacks streaming expertise
- • Ad-hoc analytical queries (use Spark or ClickHouse)
Key Points
- •Flink processes records one-at-a-time (not micro-batch), hitting single-digit millisecond latency for stateless operators and sub-second for stateful windowed aggregations
- •Chandy-Lamport checkpoint barriers flow through the dataflow graph, triggering async state snapshots without stopping processing. That's how Flink gets exactly-once without killing throughput.
- •RocksDB state backend lets state exceed JVM heap by spilling to local SSD. The heap backend is faster but limited to available memory.
- •Watermarks are heuristic. They declare 'no events older than timestamp T will arrive.' Late events are either dropped or routed to side outputs depending on the allowed lateness configuration.
- •Exactly-once end-to-end requires idempotent sinks or two-phase commit sinks (e.g., KafkaProducer with transactions) coordinated with checkpoint completion
- •Rescaling stateful operators redistributes RocksDB key groups across new parallelism. This is an expensive operation involving full state redistribution from checkpoint storage.
Common Mistakes
- ✗Using heap state backend for large state (> 2 GB per operator). This leads to GC thrashing and eventual OOM. Use RocksDB instead.
- ✗Setting checkpoint interval too aggressively (< 30s) for jobs with large state, causing checkpoint backpressure that tanks end-to-end latency
- ✗Not configuring watermark idle sources. A single idle Kafka partition prevents watermark advancement for the entire job, stalling all windows.
- ✗Ignoring operator chain breaks. Unnecessary chain breaks (e.g., rebalance between operators) prevent fusion optimization and add serialization overhead.
- ✗Assuming exactly-once means exactly-once delivery. Flink guarantees exactly-once state, but sink side-effects depend entirely on the sink implementation.