ZooKeeper
The coordination service that half the big data world still depends on
Use Cases
Architecture
Why It Exists
Here is the core problem with distributed systems: multiple processes on separate machines need to agree on shared state. Who is the leader? Which partitions live where? What configuration is active right now? Without a coordination service, every system has to implement its own consensus logic. And that is where the bugs hide. Subtle ones. Split-brain conditions. Stale reads nobody notices until production is on fire.
ZooKeeper provides a centralized, strongly consistent coordination kernel. Higher-level primitives get built on top of it: leader election, distributed locks, barriers, group membership. Yahoo! Research created it because they kept watching teams build their own consensus code, and it kept going wrong. It was error-prone and redundant. Apache Kafka, HBase, Solr, Hadoop YARN, and dozens of other systems handed off their coordination to ZooKeeper rather than reinventing the wheel.
That said, ZooKeeper is showing its age. Kafka is actively removing its ZooKeeper dependency with KRaft. New projects overwhelmingly pick etcd. But for anyone in the Hadoop ecosystem or maintain an older Kafka deployment, ZooKeeper will come up, and understanding how it works matters.
How It Works Internally
ZooKeeper organizes data into a tree of znodes, similar to a filesystem. Each znode has a path (e.g., /kafka/brokers/ids/0), a data payload (up to 1 MB), version metadata, and an ACL. There are four znode types: persistent (survives client disconnection), ephemeral (deleted when the creating session ends), persistent-sequential (auto-appended monotonic counter), and ephemeral-sequential. This is a small set of primitives, but it is surprisingly powerful. Leader election, for example, is just ephemeral-sequential nodes under a path where the lowest-sequence holder becomes leader. Elegant.
The ZAB (ZooKeeper Atomic Broadcast) protocol is the consensus backbone. It operates in two phases: leader election (called "discovery" and "synchronization") and atomic broadcast. During normal operation, all write requests get forwarded to the leader, which assigns a monotonically increasing transaction ID (zxid), proposes it to followers, waits for quorum acknowledgment, then commits. The zxid has two components: an epoch (incremented on each leader election) and a counter. This guarantees global total ordering of all state mutations across the entire ensemble.
The write path looks like this: client sends a write to any server, that server forwards to the leader, the leader creates a proposal with the next zxid, broadcasts it to all followers, waits for majority ACKs, sends a COMMIT message, applies it to its in-memory data tree, and responds to the client. The transaction also gets written to a write-ahead log on disk before acknowledgment. Periodic snapshots of the in-memory tree keep recovery time bounded.
Now here is the part people get tripped up on. Reads, by default, come from any ensemble member's local in-memory copy. This means reads can return stale data because a follower might not have applied the latest committed transaction yet. For linearizable reads, the client calls sync() before the read to force the server to catch up with the leader. This is a critical architectural decision that most people do not think about carefully enough. Read throughput scales with ensemble size, but consistency is sacrificed by default.
Sessions are the client-server abstraction. Each client maintains a TCP connection to one ensemble member with periodic heartbeats. If the server does not hear from the client within sessionTimeout, it expires the session and deletes all ephemeral znodes. The client library handles transparent reconnection to another server if the current connection drops, preserving the session as long as it reconnects within the timeout window.
Production Architecture
A production ZooKeeper ensemble runs 3 or 5 voting members across failure domains. For cross-datacenter deployments, add observers in remote datacenters. Observers receive the full replication stream but do not vote in quorum, providing local read capacity without increasing write latency. A typical topology for a global Kafka deployment: 3 voters in the primary datacenter, 1-2 observers per remote datacenter.
JVM tuning is not optional. ZooKeeper's in-memory data tree and session tracking are extremely sensitive to GC pauses. Use G1GC with -XX:MaxGCPauseMillis=20, heap size of 4-8 GB (larger heaps cause longer GC pauses, so do not just throw memory at it), and dedicate the transaction log directory to a separate physical disk from snapshots. The transaction log disk must be low-latency. Use an SSD. Every committed transaction requires a forced fsync, so spinning disks will become a bottleneck fast.
Production configuration worth knowing: tickTime=2000 (base time unit in ms), initLimit=10 (ticks for initial sync), syncLimit=5 (ticks for follower sync lag), maxClientCnxns=60 (per-IP connection limit), autopurge.snapRetainCount=5, and autopurge.purgeInterval=1 (hourly purge of old snapshots and logs).
For monitoring, expose ZooKeeper's four-letter commands (mntr, ruok, stat) or the newer AdminServer HTTP endpoint. The metrics that actually matter: zk_outstanding_requests (pending write queue), zk_avg_latency / zk_max_latency, zk_znode_count, zk_watch_count, zk_ephemerals_count, and zk_approximate_data_size.
Decision Criteria
| Criteria | ZooKeeper | etcd | Consul |
|---|---|---|---|
| Consensus protocol | ZAB (total order broadcast) | Raft | Raft |
| Data model | Hierarchical znode tree | Flat key-value with MVCC | KV + service catalog |
| Max data per node | 1 MB per znode | 1.5 MB per value | No hard per-key limit |
| Session model | TCP session with heartbeats, ephemeral nodes | Lease-based TTL on keys | Agent-based health checks |
| Watch semantics | One-shot triggers (must re-register) | Persistent gRPC streams, revision-based | Blocking long-poll queries |
| Read consistency | Serializable by default, sync() for linearizable | Linearizable or serializable (configurable) | Default eventual, consistent mode available |
| Write throughput | ~10,000-15,000 writes/sec | ~10,000-30,000 writes/sec | ~5,000-10,000 writes/sec |
| Runtime | JVM (Java) | Native (Go binary) | Native (Go binary) |
| Operational overhead | High (JVM tuning, GC, quorum mgmt) | Low (single binary, simple config) | Medium (agent deployment) |
| Ecosystem | Kafka (pre-KRaft), HBase, Solr, Hadoop | Kubernetes, cloud-native | HashiCorp stack, service mesh |
Capacity Planning
Ensemble sizing: 3 nodes for development or small production, 5 nodes for critical production workloads. Add observers for read scaling. Each voting member needs 2-4 CPU cores, 4-8 GB JVM heap, and a dedicated SSD for the transaction log.
Throughput: A 3-node ensemble delivers roughly 10,000-15,000 writes/sec and 50,000-100,000 reads/sec (reads scale with ensemble + observer count). Every write goes through the leader, so write throughput is bounded by single-node performance plus quorum latency. There is no way around that.
Storage: Transaction logs grow at approximately write_rate * 1KB/txn. With 10,000 writes/sec, that is about 10 MB/sec, or roughly 850 GB/day before purging. Configure autopurge to retain the last 3-5 snapshots and purge hourly. Snapshot size equals the in-memory data tree size, typically 100 MB to 2 GB for large deployments.
Znode limits: The practical limit is around 1 million znodes with a total data size under 500 MB. Past that, snapshot serialization time starts to hurt, and leader election recovery (which replays the transaction log since the last snapshot) becomes dangerously slow. Kafka clusters with 50,000+ partitions push right up against this boundary.
Session count: Each session consumes about 10 KB of memory and generates periodic heartbeat traffic. A single ensemble member can handle 10,000-30,000 concurrent sessions. With 1,000 Kafka brokers each maintaining 1 session, the overhead is minimal. The real concern is the number of watches and ephemeral nodes, not the session count itself.
Network: Ensemble members exchange heartbeats every tickTime (2,000ms default). Cross-member bandwidth during normal operation is approximately write_rate * avg_txn_size * (N-1). Budget 5-20 Mbps for moderate workloads.
Failure Scenarios
Scenario 1: Leader Election Storm from JVM GC Pause
Trigger: The ZooKeeper leader hits a long GC pause (2-5 seconds) due to a full GC event. This happens when the heap is undersized, when a burst of client connections creates excessive garbage, or when the OS swaps JVM memory pages to disk.
Impact: Followers stop receiving heartbeats and start a leader election after syncLimit * tickTime (default 10 seconds). During election, all write operations get rejected. If the new leader also pauses (systemic issue like memory pressure on all hosts), the result is repeated elections, a "leader election storm." Kafka brokers connected to the ensemble see their sessions expire, which triggers mass partition leader elections. In bad cases, this cascades further: Kafka consumers lose their group coordination, and consumer rebalance storms pile on on top of everything else.
Detection: Monitor JVM GC logs for pauses > 500ms. Alert on zk_max_latency exceeding 5,000ms. Track zk_server_leader_election_time and alert if leader changes occur more than once per hour. On the Kafka side, watch kafka.controller:LeaderElectionRateAndTimeMs.
Recovery: Immediately tune the JVM: switch to G1GC, set -Xmx4g -Xms4g (equal min/max to avoid resize), add -XX:MaxGCPauseMillis=20. Check for OS-level swap with vmstat. If swapping is happening, disable swap or add physical RAM. If one member is consistently problematic, replace it. Long-term: move to etcd or Kafka KRaft to get rid of the JVM dependency entirely.
Scenario 2: Ephemeral Node Cascade from Network Blip
Trigger: A brief network partition (5-30 seconds) between a datacenter hosting many clients and the ZooKeeper ensemble. All client sessions from that datacenter fail to heartbeat within sessionTimeout.
Impact: ZooKeeper expires all affected sessions at once, deleting all their ephemeral znodes in a burst. If those ephemeral nodes represented Kafka broker registrations, service locks, or leader election participants, the mass deletion triggers a thundering herd. Every affected service simultaneously tries to re-register, re-acquire locks, and re-elect leaders. The ZooKeeper ensemble gets flooded with write requests from hundreds or thousands of clients reconnecting at the same time. Write latency spikes. Some re-registration attempts timeout and retry, which makes the storm worse.
Detection: Monitor zk_ephemerals_count for sudden drops > 10% in a 1-minute window. Alert on zk_outstanding_requests exceeding 1,000. Track session expiration rate via zk_global_sessions metric delta.
Recovery: Set sessionTimeout high enough to survive transient network issues (30-60 seconds for production). Implement client-side exponential backoff with jitter on reconnection. The ZooKeeper client library alone will not prevent a thundering herd. For Kafka, seriously consider migrating to KRaft mode, which removes the ZooKeeper dependency entirely. Add observers in each datacenter to reduce cross-datacenter session traffic.
Scenario 3: Transaction Log Disk Full
Trigger: The autopurge configuration is missing or disabled, and the dedicated transaction log disk fills to 100%. Or, a snapshot corruption causes autopurge to silently fail, accumulating old transaction logs indefinitely.
Impact: When ZooKeeper cannot write to the transaction log, it cannot commit any new proposals. The leader refuses all writes and returns errors to clients. If the disk-full condition hits a follower, that follower falls behind and eventually gets marked as out-of-sync. If it hits the leader, the leader steps down. Worst case: all members running on similarly sized disks filling up around the same time. The entire ensemble goes read-only, then crashes.
Detection: Standard disk utilization monitoring on the transaction log partition. Alert at 80% usage. Monitor zk_outstanding_requests and zk_packets_received for write rejection patterns. Check autopurge logs for errors during scheduled purge cycles.
Recovery: Manually remove old transaction logs and snapshots from the dataLogDir (files with the lowest zxid prefixes), but never remove the most recent snapshot and all transaction logs after it. Restart the affected member. Configure autopurge: autopurge.snapRetainCount=3 and autopurge.purgeInterval=1. Provision the transaction log disk at 3x the expected daily log volume, and use separate disks for dataDir (snapshots) and dataLogDir (transaction logs).
Pros
- • Strong consistency through ZAB consensus protocol
- • Ordered, sequential operations
- • Ephemeral nodes for failure detection
- • Watch mechanism for real-time notifications
- • Battle-tested in production (Kafka, HBase, Solr)
Cons
- • Not designed for large data storage
- • Write throughput is limited (leader bottleneck)
- • Operational complexity with quorum management
- • Java-based, which means GC pause headaches
- • Being replaced by newer alternatives (etcd, KRaft)
When to use
- • Need leader election for distributed services
- • Existing Hadoop/Kafka ecosystem dependency
- • Distributed configuration that must be consistent
- • Service coordination requiring strong ordering
When NOT to use
- • General-purpose data storage
- • High write throughput requirements
- • Greenfield projects (consider etcd instead)
- • Simple service discovery (consider Consul)
Key Points
- •ZAB (ZooKeeper Atomic Broadcast) guarantees total order of all state changes. Every follower sees transactions in identical sequence.
- •All writes go through the leader. Reads can come from any ensemble member, which trades consistency for throughput.
- •Ephemeral znodes are tied to sessions. If the client session expires (missed heartbeats for sessionTimeout ms), all its ephemeral nodes get deleted.
- •Watches are one-shot triggers. After firing once, the client must re-register, creating a gap where events can slip through without careful client-side logic.
- •The 1MB per-znode data limit is a hard constraint. ZooKeeper is built for coordination metadata, not data storage.
- •Observers scale read throughput without affecting the write quorum. This matters a lot for cross-datacenter deployments.
Common Mistakes
- ✗Using ZooKeeper as a message queue or data store instead of a coordination service. Exceeding znode size or count will destroy performance.
- ✗Not accounting for the watch gap: between a watch firing and the client re-registering, state changes are silently lost.
- ✗Setting sessionTimeout too low (< 10s), causing spurious ephemeral node deletions during GC pauses or transient network issues.
- ✗Running ensemble sizes larger than 5 without observers. Every additional voting member increases write latency linearly.
- ✗Ignoring JVM GC tuning. A 2-second GC pause on the leader triggers a full leader election and temporary write unavailability.