Distributed Snapshots & Chandy-Lamport
Architecture
Taking a Photo of a Moving System
Suppose you have six services processing events, passing messages between each other continuously. You want to capture the state of the entire system at some logical "moment" so you can recover from it if something fails. But there is no global clock. Service A might be slightly ahead of service B. Messages from A to B are sitting in a network buffer somewhere. How do you get a picture of all of this that actually makes sense?
You cannot just ask every service to dump its state at wall-clock time T. Clocks drift. Even with NTP, you might get milliseconds of skew, and in a system processing millions of events per second, milliseconds mean thousands of events in ambiguous territory. Some services will include events that others have not seen yet. The resulting "snapshot" would be internally inconsistent.
Chandy and Lamport solved this in 1985 with an elegant algorithm that requires no synchronized clocks at all. Only FIFO message ordering on channels, which TCP gives you for free.
Chandy-Lamport: The Algorithm
Any node can decide to start a snapshot. That node records its own local state, then sends a special marker message out on every outgoing channel.
When a node receives a marker on some channel for the first time (meaning it has not started participating in this snapshot yet), it does a few things: records its own local state, starts recording all messages arriving on every incoming channel except the one the marker came from (that channel's state is empty since the marker just arrived), and forwards the marker on all its outgoing channels.
When a node receives a marker on a channel it is already recording, it stops recording on that channel. The messages it accumulated on that channel between its own state recording and the arrival of this second marker constitute the channel's state. Those are the "in-flight" messages for that channel.
Once every channel at every node has received its marker, the snapshot is complete. It consists of every node's recorded local state plus every channel's recorded in-flight messages.
Why This Produces a Consistent Cut
The concept of a consistent cut is central. A cut divides the history of the system into "before" and "after." A cut is consistent if it never shows a message being received without also showing it being sent. In other words, effects do not appear without their causes.
Markers enforce this because they travel through the same channels as regular messages. Since channels are FIFO, when a marker arrives at a node, it means all messages sent before the marker on that channel have already arrived. Any message that arrives after the marker was sent after the initiating node recorded its state.
The resulting snapshot might not correspond to any single moment in real time. Node A might have recorded its state at time 10 and node B at time 12. But the snapshot is still consistent: it represents a state the system could have passed through. Lamport's paper proves that any consistent cut corresponds to a reachable state of the computation. That is sufficient for recovery.
Flink's Barrier-Based Adaptation
Apache Flink took Chandy-Lamport and adapted it for dataflow stream processing. Instead of marker messages between arbitrary nodes, Flink injects checkpoint barriers into the data streams at the sources.
A barrier is a special record that flows through the dataflow graph alongside regular data records. When a source operator receives a checkpoint trigger from the JobManager (Flink's coordinator), it snapshots its position (like a Kafka offset), injects a barrier into its output stream, and continues processing.
Downstream operators receive barriers on their input channels. Here is where it gets interesting, and where Flink has evolved its approach significantly.
Aligned checkpoints are the original approach. When an operator with multiple input channels receives a barrier on one channel, it stops processing data from that channel. It continues processing other channels until barriers arrive on all of them. Once all barriers have arrived, the operator snapshots its state, emits the barrier downstream, and resumes processing on all channels.
The problem with alignment is backpressure. While waiting for a slow channel's barrier, the operator buffers data from channels whose barriers already arrived. If one channel is significantly slower (skewed partitions, network issues), buffers grow, memory pressure builds, and backpressure propagates upstream. In a high-throughput pipeline doing hundreds of thousands of events per second, this can cause visible latency spikes that last for the entire checkpoint duration.
Unaligned checkpoints, introduced in Flink 1.11, take a different approach. When the first barrier arrives on any input channel, the operator immediately snapshots its state and emits the barrier. It does not wait for other channels. Instead, it records all the data currently buffered in the input and output channels as part of the checkpoint.
This eliminates the backpressure problem entirely. The operator never stalls. The cost is larger checkpoints, because the in-flight data gets persisted as well. For pipelines where checkpoint-related backpressure is a real operational problem, unaligned checkpoints are a significant improvement. For pipelines with balanced partitions and modest throughput, aligned checkpoints work fine and produce smaller checkpoints.
Incremental Snapshots: Avoiding the Full Dump
A Flink operator with 50GB of state in RocksDB cannot afford to write 50GB to S3 every 30 seconds. Full snapshots at that scale would saturate network bandwidth and slow down checkpoint completion, potentially causing checkpoint timeouts.
Incremental snapshots address this by capturing only what changed since the last successful checkpoint. RocksDB makes this natural because it stores data in immutable SST files. Between checkpoints, RocksDB might create a few new SST files through flushes and compactions. The incremental checkpoint only uploads those new files.
The checkpoint metadata keeps track of which SST files constitute the full state. Restoring from an incremental checkpoint means downloading the referenced SST files from potentially several previous checkpoints and reconstructing the RocksDB state directory.
The difference in practice is dramatic. A 50GB state backend that changes 200MB between checkpoints goes from writing 50GB per checkpoint to writing 200MB. Checkpoint duration drops from minutes to seconds. This makes sub-minute checkpoint intervals practical for large stateful applications, which directly improves recovery time (less data to replay from the source).
Snapshot Isolation in Databases
Distributed snapshots and database snapshot isolation are solving different problems with a shared underlying insight: capturing a consistent view of a changing system.
PostgreSQL's MVCC (Multi-Version Concurrency Control) assigns each transaction a snapshot of which other transactions are visible to it. When a transaction starts with REPEATABLE READ or SERIALIZABLE isolation, it gets a snapshot ID. All reads in that transaction see the database as of that snapshot, regardless of concurrent writes. The mechanism is version chains on each row, not Chandy-Lamport, but the goal is identical: a consistent cut through a changing system.
CockroachDB and Spanner extend this to distributed databases using hybrid logical clocks. A read-only transaction picks a timestamp and reads data as of that timestamp from any node in the cluster. The timestamp ordering provides the consistency guarantee. Under the hood, MVCC versions on each node make this efficient, and clock synchronization (GPS/atomic clocks in Spanner, hybrid logical clocks in CockroachDB) ensures that timestamps form a consistent ordering across nodes.
The connection to Chandy-Lamport is conceptual rather than mechanical. Both say: "give me a view of the system that could have existed as a single moment in time, even though the physical collection of that view is spread across real time."
CDC as a Snapshot Problem
Debezium, the dominant open-source CDC platform, faces a snapshot problem at startup. When you connect Debezium to an existing database, it needs to capture the current state of all tables before it can start streaming changes from the WAL.
The initial snapshot reads the current data from the database while writes continue happening. Debezium uses the database's snapshot isolation (or consistent read capabilities) to get a consistent view. Then it records the WAL position at the time of the snapshot and starts streaming changes from that position forward.
This is Chandy-Lamport in disguise. The snapshot is the node state. The WAL stream from the recorded position forward captures the "in-flight messages" that happened after the snapshot but need to be included for consistency. Combined, they provide a complete and consistent capture of the database's history from that point forward.
Limitations and Gotchas
Chandy-Lamport requires FIFO channels. TCP connections satisfy this naturally, but application-layer reordering can break the assumption. If your messaging system delivers messages out of order (some configurations of Kafka with multiple in-flight requests per connection, or UDP-based protocols), markers can arrive before messages that were sent earlier, corrupting the channel state capture.
The algorithm also requires all nodes to be reachable. If a network partition isolates one node, markers never arrive there, and the snapshot never completes. Flink handles this with checkpoint timeouts: if a checkpoint does not complete within a configured duration (default 10 minutes), it is aborted and a new one is triggered later. For systems with frequent transient partitions, this means checkpoint completion rates drop and recovery points become stale.
Snapshot size is another operational concern. With aligned checkpoints, the snapshot is purely operator state. With unaligned checkpoints, it also includes buffered data. Either way, snapshots get written to a durable store (usually HDFS or S3). Retained checkpoints accumulate. Flink's checkpoint retention policies control how many completed checkpoints to keep, but with incremental snapshots, old checkpoints might reference SST files that are also referenced by newer checkpoints, making cleanup non-trivial. The state.checkpoints.num-retained setting and externalized checkpoint cleanup mode need attention.
Choosing Your Checkpoint Strategy
The checkpoint interval balances two concerns. Shorter intervals mean less data to replay on failure (faster recovery) but more overhead from barrier propagation, state serialization, and storage writes. Longer intervals mean slower recovery but lower steady-state overhead.
A reasonable starting point for most Flink applications: 1-minute interval, incremental checkpoints enabled, unaligned checkpoints if backpressure is a concern, checkpoint timeout at 10 minutes, retain 2-3 completed checkpoints. Then tune from there based on observed checkpoint duration, pipeline backpressure during checkpoints, and storage costs.
For Kafka Streams, the picture is simpler. Kafka Streams commits offsets and flushes state stores at the commit interval (default 30 seconds). State stores backed by RocksDB use Kafka changelog topics for durability, so "checkpointing" is really just flushing the in-memory write buffer and committing the consumer offset.
Spark Structured Streaming takes yet another approach with its write-ahead log for the driver and micro-batch offsets. Checkpoint location is a directory (HDFS or S3) where Spark writes offset ranges and committed batch metadata. Recovery replays from the last committed batch.
Every stream processing framework has reinvented distributed snapshots in its own way. The details differ, but Chandy-Lamport's insight lives underneath all of them: markers (or barriers, or offsets) flowing through the same channels as data, creating a consistent cut without stopping the world.
Key Points
- •A consistent distributed snapshot captures the state of every node and every in-flight message without stopping the system. The snapshot represents a state the system could have been in, even if no single moment in real time matched it exactly
- •Chandy-Lamport (1985) uses marker messages as logical scissors that cut message flows into before-snapshot and after-snapshot. Any node can initiate. The algorithm requires only FIFO channels, no global clock
- •Apache Flink adapted Chandy-Lamport for stream processing using checkpoint barriers injected into data streams. Aligned barriers give exactly-once guarantees but can cause backpressure. Unaligned barriers (Flink 1.11+) reduce latency at the cost of larger checkpoint sizes
- •Incremental snapshots capture only the state delta since the last checkpoint. For Flink operators backed by RocksDB, this can shrink checkpoint size from gigabytes to megabytes, making frequent checkpoints practical
- •The consistent cut property guarantees that for every message received in the snapshot, the corresponding send is also captured. This is what makes the snapshot usable for recovery without duplicating or losing events
Used By
Common Mistakes
- ✗Using aligned checkpoints on high-throughput Flink pipelines without benchmarking backpressure. Aligned barriers force operators to buffer data from fast channels while waiting for slow ones, which can cascade into severe backpressure spikes
- ✗Not enabling incremental checkpoints for large operator state. Full state snapshots of a 50GB RocksDB backend can take minutes and saturate network bandwidth to the checkpoint store
- ✗Setting the checkpoint interval too aggressively. Each checkpoint has coordination overhead (barrier propagation, state serialization, storage writes). An interval of 100ms on a pipeline doing 10ms checkpoints means 10% of capacity goes to checkpointing
- ✗Ignoring checkpoint storage costs. Retained checkpoints on S3 or HDFS accumulate fast, especially with incremental checkpoints that keep SST file references alive. Without a retention policy, storage bills grow quietly