Video Conferencing
Rooms, participants, and media streams. The conferencing model is simple; the hard part is the SFU decision — who forwards whose bits to whom.
Key Abstractions
The live session. Owns participants, media router, chat, role management, and status.
One attendee. Tracks audio/video/screen state, role (host, cohost, attendee).
SFU-style forwarder. Decides which streams each participant receives.
Append-only conversation log per room. Mutable only via send_message.
Class Diagram
The Key Insight
Video conferencing looks like a chat room with media attached, and at the domain level it is. What separates working designs from broken ones is the media routing topology. Peer-to-peer mesh works for 3-4 people and dies at 8 — each participant uploads N-1 copies of their stream. A Selective Forwarding Unit (SFU) flips this: each participant publishes once to the server; the server forwards selectively to subscribers. Server CPU is cheap; client bandwidth is not.
The second insight is separation of control plane from media plane. Signaling (join, mute, promote, chat) is low-throughput but latency-sensitive and must survive packet loss. Media is high-throughput, loss-tolerant, and uses UDP. Putting them on the same socket means a mid-meeting signaling event gets queued behind a gigabyte of video.
Role as a state machine matters because meetings aren't dictatorships. When the host drops connection, someone else has to become the host — otherwise nobody can end the meeting, kick a troll, or admit waitlisted participants. Explicit succession (cohost first, then longest-joined attendee) makes the behavior predictable.
Requirements
Functional
- Create and schedule meetings with host, passcode, and participant cap
- Join/leave flow with auto-subscribe to existing publishers
- Publish and unpublish audio, video, screen-share tracks
- Host/cohost/attendee roles with promotion flow
- Host succession when host leaves
- In-meeting chat
Non-Functional
- Scale to 100+ participants per room
- Signaling separate from media
- Server does not transcode — SFU forwards bits
- Role changes audited and observable
Design Decisions
Why SFU over MCU?
MCU mixes all streams server-side into one composite. Great for CPU-starved clients; terrible for server cost (transcoding is expensive). SFU forwards each stream unmodified; clients receive N streams and decide layout locally. Modern conferences (Zoom, Meet, Teams) all use SFUs with occasional MCU fallback for legacy clients.
Why separate MediaRouter from Room?
The Room is the control plane — membership, roles, chat. The MediaRouter is the media plane — tracks, subscriptions, forwarding. They have different scaling properties; separating them lets the router be stateless/rebuildable without touching control-plane state.
Why auto-subscribe everyone to everyone on join?
Small meetings want full visibility by default. Larger meetings (webinar mode) will override this with a subscription policy (subscribe only to speakers). The default sane behavior is everyone sees everyone; policies clamp it.
Why host-promotion on leave?
A meeting without a host is broken — no one can kick trolls, end the meeting, or admit waitlisted participants. Automatic succession (cohost > longest attendee) makes the system self-healing. The policy is explicit so it's testable.
Why observer events for state changes?
Recording, live transcription, and monitoring dashboards all need to know when someone joins, publishes, or is promoted. Wiring them into Room directly couples it to every integration. Observer bus keeps the core clean.
Interview Follow-ups
- "How would you do recording?" A special "recorder" participant subscribes to all publishers. It writes the composite to disk via an off-the-shelf encoder. Recording state (starting, running, paused) is a separate state machine.
- "What about large webinars (10k viewers)?" Webinar mode: N presenters (bidirectional), everyone else receive-only. Add an RTMP cascade for broadcast fan-out once the room exceeds SFU capacity (~500).
- "How do you handle bad networks?" Simulcast — publishers send low/mid/high encodings of video. Router picks the encoding per subscriber based on bandwidth estimates.
- "How does the waiting room work?" Separate
WaitingRoomentity holds pending participants. Host admits them individually; admission transfers them intoRoom.join(). - "What about end-to-end encryption?" Participants exchange keys over signaling. Media is encrypted client-side; SFU sees only ciphertext (bytes opaque to router logic). Breaks server-side recording unless a designated "recorder key" exists.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from collections import defaultdict
4 from dataclasses import dataclass, field
5 from datetime import datetime
6 from enum import Enum
7 from threading import RLock
8 from typing import Callable
9 import uuid
10
11
12 class ParticipantRole(Enum):
13 ATTENDEE = "attendee"
14 COHOST = "cohost"
15 HOST = "host"
16
17
18 class TrackType(Enum):
19 AUDIO = "audio"
20 VIDEO = "video"
21 SCREEN = "screen"
22
23
24 class RoomStatus(Enum):
25 SCHEDULED = "scheduled"
26 LIVE = "live"
27 ENDED = "ended"
28
29
30 @dataclass(frozen=True)
31 class Track:
32 id: str
33 publisher_id: str
34 type: TrackType
35 encoding: str = "opus" # audio default; video would use vp8/h264
36
37
38 @dataclass
39 class ChatMessage:
40 id: str
41 author_id: str
42 text: str
43 sent_at: datetime
44
45
46 class ChatLog:
47 def __init__(self):
48 self._messages: list[ChatMessage] = []
49 self._lock = RLock()
50
51 def append(self, author_id: str, text: str) -> ChatMessage:
52 msg = ChatMessage(
53 id=str(uuid.uuid4())[:8],
54 author_id=author_id, text=text,
55 sent_at=datetime.utcnow(),
56 )
57 with self._lock:
58 self._messages.append(msg)
59 return msg
60
61 def messages(self) -> list[ChatMessage]:
62 with self._lock:
63 return list(self._messages)
64
65
66 @dataclass
67 class Participant:
68 id: str
69 display_name: str
70 role: ParticipantRole = ParticipantRole.ATTENDEE
71 audio_on: bool = True
72 video_on: bool = False
73 screen_on: bool = False
74 hand_raised: bool = False
75 joined_at: datetime = field(default_factory=datetime.utcnow)
76
77
78 class MediaRouter:
79 """SFU-style forwarder. Tracks publishers and subscribers; routes packets between them."""
80
81 def __init__(self):
82 self._tracks: dict[str, dict[TrackType, Track]] = defaultdict(dict)
83 self._subscriptions: dict[str, set[str]] = defaultdict(set) # subscriber -> publishers
84 self._lock = RLock()
85 # Simulates actual packet delivery — in production this is RTP over UDP.
86 self._delivered: list[tuple[str, str, TrackType, int]] = []
87
88 def publish(self, participant_id: str, track: Track) -> None:
89 with self._lock:
90 self._tracks[participant_id][track.type] = track
91
92 def unpublish(self, participant_id: str, track_type: TrackType) -> None:
93 with self._lock:
94 self._tracks[participant_id].pop(track_type, None)
95
96 def subscribe(self, subscriber: str, publisher: str) -> None:
97 if subscriber == publisher:
98 return # never subscribe to own stream
99 with self._lock:
100 self._subscriptions[subscriber].add(publisher)
101
102 def unsubscribe_all(self, participant_id: str) -> None:
103 with self._lock:
104 self._subscriptions.pop(participant_id, None)
105 self._tracks.pop(participant_id, None)
106 for subs in self._subscriptions.values():
107 subs.discard(participant_id)
108
109 def forward(self, publisher: str, track_type: TrackType, size_bytes: int) -> int:
110 """Simulated packet forwarding — returns number of recipients."""
111 recipients = 0
112 with self._lock:
113 for sub_id, publishers in self._subscriptions.items():
114 if publisher in publishers and self._tracks.get(publisher, {}).get(track_type):
115 self._delivered.append((sub_id, publisher, track_type, size_bytes))
116 recipients += 1
117 return recipients
118
119 def active_publishers(self) -> dict[str, list[TrackType]]:
120 with self._lock:
121 return {pid: list(types.keys()) for pid, types in self._tracks.items() if types}
122
123 def delivered_count(self) -> int:
124 with self._lock:
125 return len(self._delivered)
126
127
128 class Room:
129 def __init__(self, meeting_id: str, host_id: str, max_participants: int = 100):
130 self.meeting_id = meeting_id
131 self._host_id = host_id
132 self._max = max_participants
133 self._participants: dict[str, Participant] = {}
134 self._router = MediaRouter()
135 self._chat = ChatLog()
136 self._status = RoomStatus.SCHEDULED
137 self._observers: list[Callable[[str, dict], None]] = []
138 self._lock = RLock()
139
140 # ---- Control plane (signaling) ----
141
142 def start(self) -> None:
143 with self._lock:
144 if self._status != RoomStatus.SCHEDULED:
145 raise RuntimeError("room already started or ended")
146 self._status = RoomStatus.LIVE
147
148 def join(self, participant_id: str, display_name: str) -> Participant:
149 with self._lock:
150 if self._status != RoomStatus.LIVE:
151 raise RuntimeError("room is not live")
152 if len(self._participants) >= self._max:
153 raise RuntimeError("room is at capacity")
154 if participant_id in self._participants:
155 return self._participants[participant_id]
156 role = ParticipantRole.HOST if participant_id == self._host_id else ParticipantRole.ATTENDEE
157 p = Participant(id=participant_id, display_name=display_name, role=role)
158 self._participants[participant_id] = p
159 # New joiner subscribes to all existing publishers; everyone subscribes to new joiner.
160 for other_id in self._participants:
161 if other_id != participant_id:
162 self._router.subscribe(participant_id, other_id)
163 self._router.subscribe(other_id, participant_id)
164 self._notify("participant_joined", {"id": participant_id, "name": display_name})
165 return p
166
167 def leave(self, participant_id: str) -> None:
168 with self._lock:
169 p = self._participants.pop(participant_id, None)
170 if p is None:
171 return
172 self._router.unsubscribe_all(participant_id)
173 self._notify("participant_left", {"id": participant_id})
174
175 # Host left: promote first cohost, or first attendee, or end the room.
176 if p.role == ParticipantRole.HOST and self._participants:
177 successor = next(
178 (x for x in self._participants.values() if x.role == ParticipantRole.COHOST),
179 next(iter(self._participants.values())),
180 )
181 successor.role = ParticipantRole.HOST
182 self._host_id = successor.id
183 self._notify("host_changed", {"id": successor.id})
184 elif not self._participants:
185 self._status = RoomStatus.ENDED
186
187 def end(self, by: str) -> None:
188 with self._lock:
189 actor = self._participants.get(by)
190 if actor is None or actor.role != ParticipantRole.HOST:
191 raise PermissionError("only the host can end the meeting")
192 self._status = RoomStatus.ENDED
193 for pid in list(self._participants.keys()):
194 self._router.unsubscribe_all(pid)
195 self._participants.clear()
196 self._notify("room_ended", {})
197
198 # ---- Media plane ----
199
200 def publish(self, participant_id: str, track_type: TrackType) -> Track:
201 with self._lock:
202 if participant_id not in self._participants:
203 raise ValueError("unknown participant")
204 track = Track(id=str(uuid.uuid4())[:8], publisher_id=participant_id, type=track_type)
205 self._router.publish(participant_id, track)
206 flag = {"audio": "audio_on", "video": "video_on", "screen": "screen_on"}[track_type.value]
207 setattr(self._participants[participant_id], flag, True)
208 self._notify("track_published", {"id": participant_id, "type": track_type.value})
209 return track
210
211 def unpublish(self, participant_id: str, track_type: TrackType) -> None:
212 with self._lock:
213 self._router.unpublish(participant_id, track_type)
214 flag = {"audio": "audio_on", "video": "video_on", "screen": "screen_on"}[track_type.value]
215 if participant_id in self._participants:
216 setattr(self._participants[participant_id], flag, False)
217
218 def forward(self, publisher_id: str, track_type: TrackType, bytes_: int) -> int:
219 return self._router.forward(publisher_id, track_type, bytes_)
220
221 # ---- Mute / hand (control plane, no track teardown) ----
222
223 def set_audio_muted(self, participant_id: str, muted: bool) -> None:
224 """Mute doesn't tear down the track — router just stops forwarding audio while muted."""
225 with self._lock:
226 p = self._participants.get(participant_id)
227 if p is None:
228 raise ValueError("unknown participant")
229 p.audio_on = not muted
230 self._notify("audio_muted" if muted else "audio_unmuted", {"id": participant_id})
231
232 def set_video_on(self, participant_id: str, on: bool) -> None:
233 with self._lock:
234 p = self._participants.get(participant_id)
235 if p is None:
236 raise ValueError("unknown participant")
237 p.video_on = on
238 self._notify("video_on" if on else "video_off", {"id": participant_id})
239
240 def raise_hand(self, participant_id: str) -> None:
241 with self._lock:
242 p = self._participants.get(participant_id)
243 if p is None:
244 raise ValueError("unknown participant")
245 if not p.hand_raised:
246 p.hand_raised = True
247 self._notify("hand_raised", {"id": participant_id})
248
249 def lower_hand(self, participant_id: str) -> None:
250 with self._lock:
251 p = self._participants.get(participant_id)
252 if p is None:
253 raise ValueError("unknown participant")
254 if p.hand_raised:
255 p.hand_raised = False
256 self._notify("hand_lowered", {"id": participant_id})
257
258 def lower_all_hands(self, by: str) -> int:
259 """Host-only bulk action used at the end of Q&A."""
260 with self._lock:
261 actor = self._participants.get(by)
262 if actor is None or actor.role not in (ParticipantRole.HOST, ParticipantRole.COHOST):
263 raise PermissionError("only host or cohost can lower all hands")
264 count = 0
265 for p in self._participants.values():
266 if p.hand_raised:
267 p.hand_raised = False
268 count += 1
269 if count:
270 self._notify("all_hands_lowered", {"count": count})
271 return count
272
273 def raised_hands(self) -> list[str]:
274 with self._lock:
275 return [p.id for p in self._participants.values() if p.hand_raised]
276
277 # ---- Role management ----
278
279 def promote(self, by: str, target_id: str, role: ParticipantRole) -> None:
280 with self._lock:
281 actor = self._participants.get(by)
282 if actor is None or actor.role != ParticipantRole.HOST:
283 raise PermissionError("only the host can change roles")
284 if target_id not in self._participants:
285 raise ValueError("unknown participant")
286 if role == ParticipantRole.HOST:
287 raise ValueError("use transfer_host for host changes")
288 self._participants[target_id].role = role
289 self._notify("role_changed", {"id": target_id, "role": role.value})
290
291 # ---- Chat ----
292
293 def send_message(self, author_id: str, text: str) -> ChatMessage:
294 with self._lock:
295 if author_id not in self._participants:
296 raise PermissionError("must be in the room to chat")
297 return self._chat.append(author_id, text)
298
299 # ---- Observers ----
300
301 def observe(self, handler: Callable[[str, dict], None]) -> None:
302 self._observers.append(handler)
303
304 def _notify(self, event: str, payload: dict) -> None:
305 for h in self._observers:
306 h(event, payload)
307
308 @property
309 def status(self) -> RoomStatus:
310 return self._status
311
312 @property
313 def participant_count(self) -> int:
314 with self._lock:
315 return len(self._participants)
316
317 def router(self) -> MediaRouter:
318 return self._router
319
320
321 if __name__ == "__main__":
322 room = Room(meeting_id="m1", host_id="alice")
323 room.observe(lambda ev, p: print(f"[event] {ev} {p}"))
324
325 room.start()
326 room.join("alice", "Alice")
327 room.join("bob", "Bob")
328 room.join("carol", "Carol")
329 print(f"Participants: {room.participant_count}")
330
331 # Everyone publishes audio; bob publishes video.
332 room.publish("alice", TrackType.AUDIO)
333 room.publish("bob", TrackType.AUDIO)
334 room.publish("bob", TrackType.VIDEO)
335 room.publish("carol", TrackType.AUDIO)
336
337 # Bob's video packet is forwarded to alice and carol — not back to bob.
338 recipients = room.forward("bob", TrackType.VIDEO, 1024)
339 print(f"Bob's video packet reached {recipients} subscribers (expected 2).")
340 assert recipients == 2
341
342 # Mute / hand flow.
343 room.set_audio_muted("carol", True)
344 assert room._participants["carol"].audio_on is False
345 room.raise_hand("carol")
346 room.raise_hand("bob")
347 print(f"Hands raised: {sorted(room.raised_hands())}")
348 assert set(room.raised_hands()) == {"carol", "bob"}
349
350 # Alice promotes Bob to cohost.
351 room.promote("alice", "bob", ParticipantRole.COHOST)
352 # Cohort Bob can lower all hands at end of Q&A.
353 lowered = room.lower_all_hands(by="bob")
354 print(f"Lowered {lowered} hand(s)")
355 assert room.raised_hands() == []
356
357 # Alice (host) leaves — Bob (cohost) inherits the host role.
358 room.leave("alice")
359 print(f"After alice leaves, room status: {room.status.value}, participants: {room.participant_count}")
360
361 # Chat.
362 msg = room.send_message("bob", "Thanks for joining")
363 print(f"Chat message {msg.id}: {msg.text!r}")Common Mistakes
- ✗Coupling signaling and media into one class. They have totally different latency and throughput profiles.
- ✗Mesh topology beyond 4 people. Every participant sends N-1 streams — bandwidth collapses at 8+ participants.
- ✗No backpressure on slow clients. A client on 2G drags the whole room's bitrate down.
- ✗Modeling host as a flag instead of a role. When the host leaves, who inherits? Role-as-state captures it.
Key Points
- ✓Separate signaling (who's in the room) from media (the actual packets). They scale differently.
- ✓SFU (Selective Forwarding Unit) scales far better than MCU (Multipoint Control Unit) — no server-side transcoding.
- ✓Each participant publishes their own track; the router decides subscriptions. Active-speaker logic runs on the router.
- ✓Role state machine: attendee → cohost → host. Transitions are audited and broadcast.