Traffic Signal
State pattern drives clean phase transitions across an intersection. Each signal knows only its own color. A controller coordinates them so conflicting directions never get green simultaneously.
Key Abstractions
Manages an intersection, coordinates signal phases so conflicting directions never overlap
One direction's light. Holds current state and transitions based on timing config.
Enum: RED, YELLOW, GREEN. Each state knows its duration and valid next state.
Strategy for per-phase durations — FixedTimingStrategy and RushHourTimingStrategy implementations
Groups multiple signals into a coordinated unit with named directions
Observer that preempts normal cycling when an emergency vehicle approaches
Class Diagram
Why This Design
Traffic signals look simple on the surface. Red, yellow, green, repeat. But the interesting constraint is coordination: two perpendicular directions can never both be green at the same time. That makes it a state machine problem, and state machines done well push all the transition logic into the states themselves.
A signal doesn't decide when to change. It's a dumb light that displays whatever color it's told. The controller owns the timing. It knows which phase is active, when to transition to yellow, and when to flip to the next phase. This separation is important because the moment you give signals autonomy, you've created a race condition. Two signals independently deciding they're green is a simulated pileup.
Requirements
Functional
- Support a four-way intersection with independent signals per direction
- Cycle through phases: North/South green while East/West red, then swap
- Yellow transition between every green-to-red change
- Emergency override: force one direction green, all others red
- Configurable timing per phase
Non-Functional
- No two conflicting directions can be green simultaneously (safety invariant)
- Timing strategy must be swappable at runtime (rush hour, nighttime, adaptive)
- Thread-safe signal state changes
- Observer notifications for pedestrian crossings and emergency systems
Design Decisions
Why State pattern over if/else chains?
Consider what happens without it: every tick has a nested if current == GREEN and elapsed > duration then set YELLOW elif current == YELLOW and elapsed > yellow_duration then set RED... and that's just for one signal. Multiply by four directions and add phase coordination. It becomes a wall of conditions. With the State pattern, each phase encapsulates its own green directions and durations. Transitions are data, not code branches.
Why signals are passive (controller-driven)?
This is a safety design. If signals drove their own transitions, you'd need distributed coordination to prevent conflicts. A single controller is the simplest correct solution. It guarantees the invariant: at most one set of non-conflicting directions is green at any time. Centralized control is appropriate here because a real intersection has a single controller box.
Why Observer for emergency vehicles?
Emergency override could be a method call on the controller. But that couples emergency detection to traffic control. In practice, emergency preemption systems are separate hardware. Observer keeps them decoupled: sensors detect the vehicle, fire an event, the controller reacts. Adding a train crossing sensor later doesn't touch the controller code.
Why Strategy for timing instead of config values?
Config values work for fixed timing. But adaptive timing needs sensor data and real-time computation. Making timing a strategy lets you have a FixedTimingStrategy that returns constants and an AdaptiveTimingStrategy that reads queue lengths from sensors and adjusts on the fly. Same controller, fundamentally different timing behavior.
Interview Follow-ups
- "How would you handle a left-turn signal?" Add more phases. A left-turn phase gives the turning direction a protected green while opposing traffic stays red. The phase list grows but the controller logic stays identical.
- "What about pedestrian crossing buttons?" Register a PedestrianObserver. When a button press arrives, extend the all-red clearance interval before the next green phase to give pedestrians time.
- "How would you make this adaptive?" Implement AdaptiveTimingStrategy. It reads from traffic sensors (loop detectors, cameras) and adjusts green duration proportional to queue length. Heavy traffic on one approach gets more green time automatically.
- "What happens if the controller crashes?" Fail to flashing red (all directions). This is the safe default that real traffic signals use during power failures. Each direction treats it as a stop sign.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from enum import Enum
4 from threading import Lock
5
6
7 class SignalState(Enum):
8 RED = "RED"
9 YELLOW = "YELLOW"
10 GREEN = "GREEN"
11
12
13 class Signal:
14 """One direction's traffic light. Passive: the controller drives transitions."""
15
16 def __init__(self, direction: str):
17 self.direction = direction
18 self._state = SignalState.RED
19 self._lock = Lock()
20
21 @property
22 def state(self) -> SignalState:
23 with self._lock:
24 return self._state
25
26 @state.setter
27 def state(self, new_state: SignalState) -> None:
28 with self._lock:
29 self._state = new_state
30
31 def __repr__(self) -> str:
32 return f"Signal({self.direction}: {self._state.value})"
33
34
35 class Phase:
36 """Defines which directions get green during this phase and for how long."""
37
38 def __init__(self, green_directions: set[str], green_duration: int, yellow_duration: int = 2):
39 self.green_directions = green_directions
40 self.green_duration = green_duration
41 self.yellow_duration = yellow_duration
42 self.total_duration = green_duration + yellow_duration
43
44 def __repr__(self) -> str:
45 return f"Phase(green={self.green_directions}, dur={self.green_duration}+{self.yellow_duration})"
46
47
48 class TimingStrategy(ABC):
49 """Strategy for determining phase durations."""
50
51 @abstractmethod
52 def get_phases(self) -> list[Phase]: ...
53
54
55 class FixedTimingStrategy(TimingStrategy):
56 """Fixed-cycle timing. Same durations every cycle."""
57
58 def __init__(self, ns_green: int = 30, ew_green: int = 25, yellow: int = 3):
59 self._ns_green = ns_green
60 self._ew_green = ew_green
61 self._yellow = yellow
62
63 def get_phases(self) -> list[Phase]:
64 return [
65 Phase({"NORTH", "SOUTH"}, self._ns_green, self._yellow),
66 Phase({"EAST", "WEST"}, self._ew_green, self._yellow),
67 ]
68
69
70 class RushHourTimingStrategy(TimingStrategy):
71 """Longer green for the main road during rush hour."""
72
73 def __init__(self, main_green: int = 45, cross_green: int = 15, yellow: int = 3):
74 self._main_green = main_green
75 self._cross_green = cross_green
76 self._yellow = yellow
77
78 def get_phases(self) -> list[Phase]:
79 return [
80 Phase({"NORTH", "SOUTH"}, self._main_green, self._yellow),
81 Phase({"EAST", "WEST"}, self._cross_green, self._yellow),
82 ]
83
84
85 class SignalObserver(ABC):
86 """Observer notified on signal state changes."""
87
88 @abstractmethod
89 def on_signal_change(self, direction: str, old_state: SignalState, new_state: SignalState) -> None: ...
90
91
92 class PedestrianCrossingObserver(SignalObserver):
93 def on_signal_change(self, direction: str, old_state: SignalState, new_state: SignalState) -> None:
94 if new_state == SignalState.RED:
95 print(f" [Pedestrian] Walk signal ON for crossing at {direction}")
96 elif old_state == SignalState.RED and new_state == SignalState.GREEN:
97 print(f" [Pedestrian] Walk signal OFF for crossing at {direction}")
98
99
100 class EmergencyObserver(SignalObserver):
101 def on_signal_change(self, direction: str, old_state: SignalState, new_state: SignalState) -> None:
102 if new_state == SignalState.GREEN:
103 print(f" [Emergency] {direction} is clear for emergency vehicles")
104
105
106 class Intersection:
107 """Groups multiple signals into a coordinated unit."""
108
109 DIRECTIONS = ("NORTH", "SOUTH", "EAST", "WEST")
110
111 def __init__(self):
112 self.signals: dict[str, Signal] = {d: Signal(d) for d in self.DIRECTIONS}
113 self._observers: list[SignalObserver] = []
114
115 def add_observer(self, observer: SignalObserver) -> None:
116 self._observers.append(observer)
117
118 def _notify(self, direction: str, old_state: SignalState, new_state: SignalState) -> None:
119 for obs in self._observers:
120 obs.on_signal_change(direction, old_state, new_state)
121
122 def set_phase(self, green_directions: set[str]) -> None:
123 """Set specified directions to GREEN, everything else to RED."""
124 for direction, signal in self.signals.items():
125 old = signal.state
126 if direction in green_directions:
127 signal.state = SignalState.GREEN
128 else:
129 signal.state = SignalState.RED
130 if signal.state != old:
131 self._notify(direction, old, signal.state)
132
133 def set_yellow(self, directions: set[str]) -> None:
134 """Transition specified directions to YELLOW."""
135 for direction in directions:
136 signal = self.signals[direction]
137 old = signal.state
138 signal.state = SignalState.YELLOW
139 if old != SignalState.YELLOW:
140 self._notify(direction, old, SignalState.YELLOW)
141
142 def set_all_red(self) -> None:
143 for direction, signal in self.signals.items():
144 old = signal.state
145 signal.state = SignalState.RED
146 if old != SignalState.RED:
147 self._notify(direction, old, SignalState.RED)
148
149 def status(self) -> dict[str, str]:
150 return {d: s.state.value for d, s in self.signals.items()}
151
152
153 class TrafficController:
154 """Coordinates signal phases for an intersection."""
155
156 def __init__(self, timing_strategy: TimingStrategy | None = None):
157 self._intersection = Intersection()
158 self._strategy = timing_strategy or FixedTimingStrategy()
159 self._phases = self._strategy.get_phases()
160 self._current_phase_idx = 0
161 self._tick_in_phase = 0
162 self._emergency_active = False
163 self._emergency_direction: str | None = None
164
165 # Start first phase
166 phase = self._phases[self._current_phase_idx]
167 self._intersection.set_phase(phase.green_directions)
168
169 @property
170 def intersection(self) -> Intersection:
171 return self._intersection
172
173 def add_observer(self, observer: SignalObserver) -> None:
174 self._intersection.add_observer(observer)
175
176 def tick(self) -> str | None:
177 """Advance the controller by one time unit. Returns event description if a transition happened."""
178 if self._emergency_active:
179 return None # Hold current override until released
180
181 phase = self._phases[self._current_phase_idx]
182 self._tick_in_phase += 1
183
184 # Green phase ends, transition to yellow
185 if self._tick_in_phase == phase.green_duration:
186 self._intersection.set_yellow(phase.green_directions)
187 return f"Phase {self._current_phase_idx}: YELLOW for {phase.green_directions}"
188
189 # Yellow phase ends, advance to next phase
190 if self._tick_in_phase >= phase.total_duration:
191 self._tick_in_phase = 0
192 self._current_phase_idx = (self._current_phase_idx + 1) % len(self._phases)
193 next_phase = self._phases[self._current_phase_idx]
194 self._intersection.set_phase(next_phase.green_directions)
195 return f"Phase {self._current_phase_idx}: GREEN for {next_phase.green_directions}"
196
197 return None
198
199 def trigger_emergency(self, direction: str) -> None:
200 """Override: set one direction green, all others red."""
201 print(f" ** EMERGENCY OVERRIDE: clearing {direction} **")
202 self._emergency_active = True
203 self._emergency_direction = direction
204 self._intersection.set_all_red()
205 self._intersection.set_phase({direction})
206
207 def release_emergency(self) -> None:
208 """Resume normal cycling from the current phase."""
209 print(" ** EMERGENCY RELEASED: resuming normal cycle **")
210 self._emergency_active = False
211 self._emergency_direction = None
212 self._tick_in_phase = 0
213 phase = self._phases[self._current_phase_idx]
214 self._intersection.set_phase(phase.green_directions)
215
216 def set_timing_strategy(self, strategy: TimingStrategy) -> None:
217 """Swap timing at runtime. Takes effect at the next phase boundary."""
218 self._strategy = strategy
219 self._phases = strategy.get_phases()
220 self._current_phase_idx = self._current_phase_idx % len(self._phases)
221
222 def status(self) -> dict[str, str]:
223 return self._intersection.status()
224
225
226 if __name__ == "__main__":
227 print("=== Traffic Signal Controller Demo ===\n")
228
229 controller = TrafficController(FixedTimingStrategy(ns_green=5, ew_green=4, yellow=2))
230 controller.add_observer(PedestrianCrossingObserver())
231
232 print("Initial state:", controller.status())
233
234 print("\n--- Running through two full cycles ---")
235 for t in range(28):
236 event = controller.tick()
237 if event:
238 print(f" Tick {t + 1}: {event}")
239 print(f" Status: {controller.status()}")
240
241 print("\n--- Emergency vehicle approaching from EAST ---")
242 controller.trigger_emergency("EAST")
243 print(f" Status: {controller.status()}")
244
245 # Hold emergency for a few ticks
246 for t in range(3):
247 controller.tick()
248
249 controller.release_emergency()
250 print(f" Status: {controller.status()}")
251
252 print("\n--- Switching to rush hour timing ---")
253 controller.set_timing_strategy(RushHourTimingStrategy(main_green=6, cross_green=3, yellow=2))
254 for t in range(20):
255 event = controller.tick()
256 if event:
257 print(f" Tick {t + 1}: {event}")
258 print(f" Status: {controller.status()}")
259
260 print("\nDone.")Common Mistakes
- ✗Letting signals self-transition without a controller. Two perpendicular signals can both go green, causing a simulated collision.
- ✗Using string comparisons for signal states instead of enums. 'GRREN' won't cause a compile error, but it will cause a runtime bug.
- ✗Hardcoding timing durations inside the signal. Now changing rush hour timing means editing signal internals.
- ✗Forgetting the yellow phase. Jumping straight from green to red is unrealistic and breaks any simulation that models driver behavior.
Key Points
- ✓State pattern eliminates if/else chains for phase transitions. Each state knows its successor, so invalid transitions are structurally impossible.
- ✓Signals are passive. They don't decide when to change. The controller tells them. This prevents two conflicting directions from going green.
- ✓Timing config as a separate object means rush hour, nighttime, and adaptive schedules are just data swaps, not code changes.
- ✓Observer for emergency vehicles keeps priority override decoupled from normal signal logic.