Producer-Consumer Pattern
One or more producers add work items to a bounded buffer; one or more consumers remove and process them. The buffer decouples their rates; the synchronization makes the buffer safe and prevents busy-waiting.
Diagram
What it is
The producer-consumer pattern puts a bounded queue between two parts of a system that work at different rates. Producers push items into the queue (see diagram above); consumers pull them out. The queue handles synchronisation, blocking, and backpressure.
The blocking behaviour is the whole win:
- Queue full? Producer blocks until a consumer takes one.
- Queue empty? Consumer blocks until a producer adds one.
That's automatic backpressure. A slow consumer naturally caps the producer rate, and the queue's capacity sets a hard limit on how much work can be in flight.
Why it matters
Almost every concurrent system is producer-consumer in disguise. A web server is a producer (incoming requests) feeding consumers (worker threads). A log shipper buffers between the app thread and the network. A Kafka consumer pulls from a partition and hands off to a processing pool. Interviewers love this pattern because the implementation has just enough subtlety (bounded vs unbounded, single vs multi-producer, shutdown semantics) to expose whether the candidate actually understands concurrency or just memorised APIs.
How a correct implementation looks under the hood
The bounded buffer invariant: 0 <= size <= capacity
Producer wants to put: Consumer wants to take:
acquire lock acquire lock
while size == capacity: while size == 0:
wait(notFull) wait(notEmpty)
buffer.append(item) item = buffer.removeFirst()
signal(notEmpty) signal(notFull)
release lock release lock
Four pieces: a bounded buffer, a mutex protecting it, two condition variables (notFull, notEmpty), and a while loop around each wait (because spurious wakeups are real and the loop re-checks). Two CVs so each side only wakes the kind of thread that's actually unblocked.
Two classic bugs
- Busy waiting (
while empty: spin) burns 100% CPU. Use the queue's blocking API or a real CV. - Single condition variable for both sides forces
signalAll()to be safe, wasting wakeups. Use two.
When to reach for what
Picking the right primitive
- Java:
ArrayBlockingQueue(default),LinkedBlockingQueue(only if unbounded is needed, which is rare),SynchronousQueue(zero-capacity hand-off),PriorityBlockingQueue(priority-ordered). - Python (threading):
queue.Queuealways. Don't roll a custom one. - Python (asyncio):
asyncio.Queuefor cooperative I/O concurrency. - Go: a buffered channel. The pattern is the channel.
Common interview twists
- Multiple producers, single consumer, Java/Python: same. Go: one goroutine waits for producers via
WaitGroupthen closes the channel exactly once. - Graceful shutdown, Java/Python: sentinel value (poison pill) or completion flag. Go:
close(ch). - Backpressure, bounded buffer + blocking
putis backpressure. To drop instead of block, useoffer(timeout)(Java) /put_nowait(Python) /selectwithdefault(Go). - Priority queue variant, replace the queue impl; everything else stays.
Primitives by language
- BlockingQueue (ArrayBlockingQueue, LinkedBlockingQueue)
- ReentrantLock + Condition (notFull, notEmpty)
- Semaphore pair
Implementations
The two failure modes here are textbook: spinning on a non-volatile variable wastes CPU and can spin forever, and LinkedList is not thread-safe so concurrent offer/poll corrupts internal pointers. Almost every interview question on producer-consumer starts here and asks "what's wrong?"
1 class BrokenBuffer<T> {
2 private final Queue<T> queue = new LinkedList<>();
3 private final int capacity;
4 BrokenBuffer(int capacity) { this.capacity = capacity; }
5
6 public void put(T item) {
7 while (queue.size() == capacity) { /* spin */ } // race + 100% CPU
8 queue.offer(item); // not thread-safe
9 }
10 public T take() {
11 while (queue.isEmpty()) { /* spin */ } // race + 100% CPU
12 return queue.poll();
13 }
14 }Two conditions, not one. notFull parks producers when the buffer is full; notEmpty parks consumers when empty. Each signal() wakes exactly the kind of thread that needs to wake; no notifyAll() storms. Always wait in a while loop, never if.
1 class BoundedBuffer<T> {
2 private final Queue<T> queue = new LinkedList<>();
3 private final int capacity;
4 private final ReentrantLock lock = new ReentrantLock();
5 private final Condition notFull = lock.newCondition();
6 private final Condition notEmpty = lock.newCondition();
7
8 BoundedBuffer(int capacity) { this.capacity = capacity; }
9
10 public void put(T item) throws InterruptedException {
11 lock.lock();
12 try {
13 while (queue.size() == capacity) notFull.await();
14 queue.offer(item);
15 notEmpty.signal();
16 } finally { lock.unlock(); }
17 }
18
19 public T take() throws InterruptedException {
20 lock.lock();
21 try {
22 while (queue.isEmpty()) notEmpty.await();
23 T item = queue.poll();
24 notFull.signal();
25 return item;
26 } finally { lock.unlock(); }
27 }
28 }The standard library already implements bounded producer-consumer correctly. Reach for ArrayBlockingQueue (fixed size, array-backed) or LinkedBlockingQueue (optionally bounded, linked-node) before writing a custom one.
1 BlockingQueue<Task> queue = new ArrayBlockingQueue<>(100);
2
3 Thread producer = new Thread(() -> {
4 try {
5 while (true) queue.put(createTask()); // blocks if full
6 } catch (InterruptedException e) {
7 Thread.currentThread().interrupt();
8 }
9 });
10
11 Thread consumer = new Thread(() -> {
12 try {
13 while (true) process(queue.take()); // blocks if empty
14 } catch (InterruptedException e) {
15 Thread.currentThread().interrupt();
16 }
17 });
18 producer.start(); consumer.start();Key points
- •Bounded buffer: producers block when full, consumers block when empty
- •Two condition variables (notFull, notEmpty), never one, or lost wakeups become possible
- •Always wait in a while-loop, never if, to handle spurious and stolen wakeups
- •In Go, a buffered channel IS the entire pattern, no extra synchronization needed
- •In Python, queue.Queue is the canonical impl, don't roll a custom one
- •Graceful shutdown: producer closes channel / sends sentinel; consumer detects and exits
Tradeoffs
| Option | Pros | Cons | When to use |
|---|---|---|---|
| Java BlockingQueue |
|
| Default for Java producer-consumer |
| Java ReentrantLock + Condition |
|
| Custom buffer semantics, fairness requirements |
| Python queue.Queue |
|
| Default for Python threading-based producer-consumer |
| Python asyncio.Queue |
|
| asyncio-based services |
| Go buffered channel |
|
| Default for Go producer-consumer |
Follow-up questions
▸Why use TWO conditions (notFull, notEmpty) instead of one?
▸Why must wait() be in a while loop, not if?
▸In Go, who should close the channel?
▸How is 'no more items' signalled in Java BlockingQueue?
▸How does Python's queue.Queue handle shutdown?
Gotchas
- !Closing a Go channel twice → panic. Sending on a closed channel → panic.
- !wg.Add() must run BEFORE 'go ...' or it races with wg.Wait()
- !Spurious wakeups: never use 'if' instead of 'while' around await()
- !BlockingQueue.put vs offer vs add: put blocks, offer returns false, add throws, pick the right one
- !Forgetting to handle InterruptedException = thread silently lives on
- !Python: forgetting to put the sentinel = consumer hangs forever
Common pitfalls
- Busy-waiting in a check loop instead of using wait/await, 100% CPU
- Single condition variable + signal() instead of signalAll() = lost wakeup deadlock
- Unbounded buffer (LinkedBlockingQueue with no capacity) → OOM under producer overload
Practice problems
ReentrantLock + two Conditions, or one Semaphore for empty-slots + one for full-slots
Producer-consumer with concurrent set for visited URLs + bounded thread pool
Two semaphores or CyclicBarrier with 3, coordinate hydrogen and oxygen threads
APIs worth memorising
- Java: ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, TransferQueue, ReentrantLock.newCondition
- Python: queue.Queue, queue.PriorityQueue, asyncio.Queue, threading.Condition
- Go: make(chan T, n), close(ch), range ch, select
Log shippers, metrics buffers, work queues, request rate limiters, message bus consumers. Every Kafka consumer is a producer-consumer between the network thread and the application handler.