Workflow Engine
Workflows are composite trees where each node is a command step. A state machine governs the lifecycle, memento checkpointing captures progress at step boundaries, and observers broadcast every transition. You get a pipeline engine that can nest, undo, pause, and recover from crashes.
Key Abstractions
Command interface with execute() and undo() methods. Every step in a workflow implements this contract.
Composite that contains child steps and executes them in order. Can nest other composites.
Composite that contains child steps and executes them concurrently using a thread pool.
Enum representing lifecycle states: PENDING, RUNNING, PAUSED, COMPLETED, FAILED.
Memento that captures the engine's progress at step boundaries for crash recovery.
Observer interface notified on every state change, step completion, or failure event.
Class Diagram
How It Works
A workflow engine models pipelines as a tree of steps. Leaf nodes are concrete actions: validate data, send an email, call an API. Internal nodes are composites that define execution strategy: sequential runs children one by one, parallel fans them out to a thread pool. The engine walks the tree, managing lifecycle state and capturing checkpoints as it goes.
The Composite pattern is the structural backbone. SequentialWorkflow and ParallelWorkflow implement the same WorkflowStep interface as leaf steps, so they nest inside each other to arbitrary depth. A data pipeline might be a sequential workflow whose second step is a parallel fan-out, whose third step is another sequential sub-pipeline. The engine doesn't need to know.
Each step is a Command with execute() and undo(). When a sequential workflow hits a failure at step N, it iterates backwards through steps N-1 to 0, calling undo() on each. Each step is responsible for cleaning up its own side effects, and the chain propagates the rollback without any step knowing about its siblings.
The engine maintains a State Machine with five states. Transitions are validated against a whitelist, so calling run() on a COMPLETED engine throws an error instead of silently corrupting state. After each top-level step completes, a checkpoint captures the current progress as a Memento. If the process crashes, a recovery routine can read the last checkpoint, identify which steps already ran, and resume from the next one.
Observers receive notifications on every state change, step completion, and failure. Logging, metrics, alerting, and UI updates all stay decoupled from the engine itself.
Requirements
Functional
- Define workflows as trees of steps: leaf steps for actions, composite steps for sequential or parallel execution
- Execute workflows with automatic rollback on failure (undo completed steps in reverse)
- Enforce lifecycle state transitions: PENDING, RUNNING, PAUSED, COMPLETED, FAILED
- Checkpoint progress after each step for crash recovery
- Notify listeners on state changes, step completions, and failures
Non-Functional
- Illegal state transitions must be rejected with clear errors
- Adding new step types must not require modifying the engine or existing composites
- Parallel steps must not block each other; one slow step should not stall the others
- Checkpoint storage should be replaceable (in-memory for testing, persistent for production)
Design Decisions
Couldn't we just use a flat step list instead of Composite?
A flat list forces every pipeline to be linear. Real workflows branch: validate, then fan out to three independent transforms, then converge for a final write. Composite lets you express this as a SequentialWorkflow containing a ParallelWorkflow. Adding a new execution strategy (e.g., ConditionalWorkflow that runs children based on a predicate) means implementing one interface, not restructuring the engine.
What if steps were just plain functions instead of Commands?
Plain functions have no undo(). When step 3 of 5 fails, you need to reverse steps 1 and 2. Command encapsulates both the forward action and its reversal. Each step knows how to clean up after itself, which is the only scalable approach when steps have side effects like database writes or API calls.
Does event replay work better than Memento for crash recovery?
Event sourcing rebuilds state by replaying every event from the beginning. For a 50-step workflow, that means re-executing 49 steps to recover at step 50. Memento captures a snapshot at step boundaries. Recovery reads the last snapshot and resumes from the next step. You trade storage for speed, which is the right trade-off when steps are expensive (HTTP calls, data transforms, email sends).
What about using boolean flags instead of a state machine?
Boolean flags (isRunning, isCompleted, isFailed) create a combinatorial explosion. What does isRunning=true and isFailed=true mean? A single enum with validated transitions eliminates impossible states. The transition map is the single source of truth for what the engine can do in any given state.
Interview Follow-ups
- "How would you distribute this across multiple machines?" Partition the workflow tree across workers. The engine becomes a coordinator that assigns sub-trees to workers via a task queue. Each worker checkpoints independently, and the coordinator merges results. Use distributed locks to prevent duplicate execution.
- "How would you handle compensation (sagas)?" Each step defines a compensate() method alongside undo(). Unlike undo which reverses a local operation, compensate issues a new forward action that semantically reverses the effect (e.g., issuing a refund instead of rolling back a payment). The engine runs compensations in reverse order on failure.
- "How do you ensure idempotency?" Assign a unique execution ID to each workflow run. Steps check whether their side effect already exists for that ID before executing. Combined with checkpointing, a recovered workflow never double-sends an email or double-writes a record.
- "How would you add conditional branching?" Create a ConditionalWorkflow composite that evaluates a predicate and delegates to one of two child steps. Because it implements WorkflowStep, it plugs into any position in the tree without changing the engine.
Code Implementation
1 from __future__ import annotations
2 from abc import ABC, abstractmethod
3 from enum import Enum
4 from dataclasses import dataclass, field
5 from datetime import datetime
6 from concurrent.futures import ThreadPoolExecutor, as_completed
7 from typing import Callable
8 import copy
9
10
11 # --------------- State Pattern: workflow lifecycle ---------------
12
13 class WorkflowState(Enum):
14 PENDING = "PENDING"
15 RUNNING = "RUNNING"
16 PAUSED = "PAUSED"
17 COMPLETED = "COMPLETED"
18 FAILED = "FAILED"
19
20 VALID_TRANSITIONS: dict[WorkflowState, set[WorkflowState]] = {
21 WorkflowState.PENDING: {WorkflowState.RUNNING},
22 WorkflowState.RUNNING: {WorkflowState.PAUSED, WorkflowState.COMPLETED, WorkflowState.FAILED},
23 WorkflowState.PAUSED: {WorkflowState.RUNNING, WorkflowState.FAILED},
24 WorkflowState.COMPLETED: set(),
25 WorkflowState.FAILED: {WorkflowState.PENDING}, # allow retry from scratch
26 }
27
28
29 # --------------- Observer Pattern: listeners ---------------
30
31 class WorkflowListener(ABC):
32 @abstractmethod
33 def on_state_change(self, old: WorkflowState, new: WorkflowState) -> None: ...
34 @abstractmethod
35 def on_step_completed(self, name: str) -> None: ...
36 @abstractmethod
37 def on_step_failed(self, name: str, error: str) -> None: ...
38
39
40 class ConsoleListener(WorkflowListener):
41 def on_state_change(self, old: WorkflowState, new: WorkflowState) -> None:
42 print(f" [State] {old.value} -> {new.value}")
43 def on_step_completed(self, name: str) -> None:
44 print(f" [Done] step '{name}' completed")
45 def on_step_failed(self, name: str, error: str) -> None:
46 print(f" [Fail] step '{name}': {error}")
47
48
49 # --------------- Memento Pattern: checkpoints ---------------
50
51 @dataclass
52 class WorkflowCheckpoint:
53 completed_steps: list[str]
54 state: WorkflowState
55 timestamp: datetime = field(default_factory=datetime.now)
56
57 def __str__(self) -> str:
58 steps = ", ".join(self.completed_steps) if self.completed_steps else "(none)"
59 return f"Checkpoint({self.state.value}, completed=[{steps}])"
60
61
62 # --------------- Command Pattern: step interface ---------------
63
64 @dataclass
65 class StepResult:
66 success: bool
67 message: str = ""
68
69
70 class WorkflowStep(ABC):
71 """Command interface : every step can execute and undo."""
72 @abstractmethod
73 def execute(self) -> StepResult: ...
74 @abstractmethod
75 def undo(self) -> None: ...
76 @abstractmethod
77 def get_name(self) -> str: ...
78
79
80 # --------------- Concrete leaf steps ---------------
81
82 class ValidateDataStep(WorkflowStep):
83 def __init__(self) -> None:
84 self._executed = False
85 def get_name(self) -> str:
86 return "ValidateData"
87 def execute(self) -> StepResult:
88 print(f" > Validating data...")
89 self._executed = True
90 return StepResult(True, "Data valid")
91 def undo(self) -> None:
92 if self._executed:
93 print(f" > Undo: reverting validation flag")
94 self._executed = False
95
96
97 class TransformDataStep(WorkflowStep):
98 def __init__(self) -> None:
99 self._executed = False
100 def get_name(self) -> str:
101 return "TransformData"
102 def execute(self) -> StepResult:
103 print(f" > Transforming data...")
104 self._executed = True
105 return StepResult(True, "Data transformed")
106 def undo(self) -> None:
107 if self._executed:
108 print(f" > Undo: reverting transformation")
109 self._executed = False
110
111
112 class SendEmailStep(WorkflowStep):
113 def __init__(self) -> None:
114 self._executed = False
115 def get_name(self) -> str:
116 return "SendEmail"
117 def execute(self) -> StepResult:
118 print(f" > Sending notification email...")
119 self._executed = True
120 return StepResult(True, "Email sent")
121 def undo(self) -> None:
122 if self._executed:
123 print(f" > Undo: queuing recall email")
124 self._executed = False
125
126
127 class FailingStep(WorkflowStep):
128 """Intentionally fails to demonstrate rollback."""
129 def get_name(self) -> str:
130 return "FailingStep"
131 def execute(self) -> StepResult:
132 print(f" > Executing failing step...")
133 return StepResult(False, "Simulated failure")
134 def undo(self) -> None:
135 print(f" > Undo: cleaning up failed step")
136
137
138 # --------------- Composite Pattern: sequential & parallel ---------------
139
140 class SequentialWorkflow(WorkflowStep):
141 """Composite that runs children in order. Rolls back on failure."""
142 def __init__(self, name: str) -> None:
143 self._name = name
144 self._steps: list[WorkflowStep] = []
145 self._completed: list[WorkflowStep] = []
146
147 def get_name(self) -> str:
148 return self._name
149
150 def add_step(self, step: WorkflowStep) -> None:
151 self._steps.append(step)
152
153 def execute(self) -> StepResult:
154 self._completed.clear()
155 for step in self._steps:
156 result = step.execute()
157 if not result.success:
158 self._rollback()
159 return StepResult(False, f"Failed at '{step.get_name()}': {result.message}")
160 self._completed.append(step)
161 return StepResult(True, f"All {len(self._steps)} steps completed")
162
163 def _rollback(self) -> None:
164 """Chain of Responsibility: undo completed steps in reverse order."""
165 for step in reversed(self._completed):
166 step.undo()
167 self._completed.clear()
168
169 def undo(self) -> None:
170 self._rollback()
171
172
173 class ParallelWorkflow(WorkflowStep):
174 """Composite that runs children concurrently."""
175 def __init__(self, name: str, max_workers: int = 4) -> None:
176 self._name = name
177 self._steps: list[WorkflowStep] = []
178 self._completed: list[WorkflowStep] = []
179 self._max_workers = max_workers
180
181 def get_name(self) -> str:
182 return self._name
183
184 def add_step(self, step: WorkflowStep) -> None:
185 self._steps.append(step)
186
187 def execute(self) -> StepResult:
188 self._completed.clear()
189 results: dict[str, StepResult] = {}
190 with ThreadPoolExecutor(max_workers=self._max_workers) as pool:
191 futures = {pool.submit(s.execute): s for s in self._steps}
192 for future in as_completed(futures):
193 step = futures[future]
194 result = future.result()
195 results[step.get_name()] = result
196 if result.success:
197 self._completed.append(step)
198 failed = [n for n, r in results.items() if not r.success]
199 if failed:
200 self._rollback()
201 return StepResult(False, f"Parallel failures: {failed}")
202 return StepResult(True, f"All {len(self._steps)} parallel steps done")
203
204 def _rollback(self) -> None:
205 for step in reversed(self._completed):
206 step.undo()
207 self._completed.clear()
208
209 def undo(self) -> None:
210 self._rollback()
211
212
213 # --------------- Engine: state machine + checkpointing ---------------
214
215 class WorkflowEngine:
216 def __init__(self) -> None:
217 self._state = WorkflowState.PENDING
218 self._listeners: list[WorkflowListener] = []
219 self._checkpoints: list[WorkflowCheckpoint] = []
220 self._completed_steps: list[str] = []
221
222 def add_listener(self, listener: WorkflowListener) -> None:
223 self._listeners.append(listener)
224
225 def _transition(self, new_state: WorkflowState) -> None:
226 if new_state not in VALID_TRANSITIONS[self._state]:
227 raise IllegalTransitionError(
228 f"Cannot move from {self._state.value} to {new_state.value}"
229 )
230 old = self._state
231 self._state = new_state
232 for listener in self._listeners:
233 listener.on_state_change(old, new_state)
234
235 def _checkpoint(self) -> None:
236 cp = WorkflowCheckpoint(
237 completed_steps=list(self._completed_steps),
238 state=self._state,
239 )
240 self._checkpoints.append(cp)
241
242 def get_last_checkpoint(self) -> WorkflowCheckpoint | None:
243 return self._checkpoints[-1] if self._checkpoints else None
244
245 def run(self, workflow: WorkflowStep) -> bool:
246 self._transition(WorkflowState.RUNNING)
247 result = self._execute_step(workflow)
248 if result.success:
249 self._transition(WorkflowState.COMPLETED)
250 else:
251 self._transition(WorkflowState.FAILED)
252 return result.success
253
254 def _execute_step(self, step: WorkflowStep) -> StepResult:
255 result = step.execute()
256 if result.success:
257 self._completed_steps.append(step.get_name())
258 self._checkpoint()
259 for listener in self._listeners:
260 listener.on_step_completed(step.get_name())
261 else:
262 for listener in self._listeners:
263 listener.on_step_failed(step.get_name(), result.message)
264 return result
265
266 @property
267 def state(self) -> WorkflowState:
268 return self._state
269
270
271 class IllegalTransitionError(Exception):
272 pass
273
274
275 # --------------- Demo ---------------
276
277 if __name__ == "__main__":
278 print("=== Workflow Engine Demo ===\n")
279
280 # --- Build a nested workflow ---
281 print("1) Building nested workflow")
282 main_workflow = SequentialWorkflow("MainPipeline")
283
284 # First: validate
285 main_workflow.add_step(ValidateDataStep())
286
287 # Second: parallel sub-workflow (transform + notify concurrently)
288 parallel = ParallelWorkflow("ParallelProcessing")
289 parallel.add_step(TransformDataStep())
290 parallel.add_step(SendEmailStep())
291 main_workflow.add_step(parallel)
292
293 # --- Execute successfully ---
294 print("\n2) Running workflow (happy path)")
295 engine = WorkflowEngine()
296 engine.add_listener(ConsoleListener())
297 success = engine.run(main_workflow)
298 print(f" Result: {'SUCCESS' if success else 'FAILED'}")
299 print(f" Engine state: {engine.state.value}")
300 cp = engine.get_last_checkpoint()
301 print(f" Last checkpoint: {cp}")
302
303 # --- Demonstrate illegal transition ---
304 print("\n3) Attempting illegal transition COMPLETED -> RUNNING")
305 try:
306 engine._transition(WorkflowState.RUNNING)
307 except IllegalTransitionError as e:
308 print(f" Caught: {e}")
309
310 # --- Demonstrate failure + rollback ---
311 print("\n4) Running workflow with a failing step")
312 failing_workflow = SequentialWorkflow("FailingPipeline")
313 failing_workflow.add_step(ValidateDataStep())
314 failing_workflow.add_step(TransformDataStep())
315 failing_workflow.add_step(FailingStep())
316 failing_workflow.add_step(SendEmailStep())
317
318 engine2 = WorkflowEngine()
319 engine2.add_listener(ConsoleListener())
320 success2 = engine2.run(failing_workflow)
321 print(f" Result: {'SUCCESS' if success2 else 'FAILED (with rollback)'}")
322 print(f" Engine state: {engine2.state.value}")
323
324 # --- Demonstrate checkpoint recovery ---
325 print("\n5) Checkpoint-based recovery")
326 engine3 = WorkflowEngine()
327 engine3.add_listener(ConsoleListener())
328
329 recovery_workflow = SequentialWorkflow("RecoveryPipeline")
330 recovery_workflow.add_step(ValidateDataStep())
331 recovery_workflow.add_step(TransformDataStep())
332
333 engine3.run(recovery_workflow)
334 cp3 = engine3.get_last_checkpoint()
335 print(f" Checkpoint after run: {cp3}")
336 print(f" Completed steps in checkpoint: {cp3.completed_steps}")
337
338 print("\n=== All scenarios completed ===")Common Mistakes
- ✗Not handling partial failures in composite workflows. If step 3 of 5 fails, steps 1 and 2 must be rolled back or the system is left in an inconsistent state.
- ✗Missing checkpoints between steps. Without checkpointing, a crash after step 4 of 10 means re-running all 10 steps from scratch.
- ✗Allowing COMPLETED to RUNNING transition. Once a workflow finishes, re-running it should create a new execution instance, not mutate the completed one.
- ✗Coupling step logic to orchestration. Steps should not know about the engine, other steps, or execution order. They execute their own logic and nothing more.
Key Points
- ✓Composite pattern lets you nest arbitrarily. A SequentialWorkflow can contain ParallelWorkflows, which themselves contain leaf steps. The engine doesn't care about depth.
- ✓Command pattern makes every step reversible. On failure partway through a sequence, the engine calls undo() on each completed step in reverse order for clean rollback.
- ✓Memento checkpointing saves progress after each step completes. On crash recovery, the engine skips already-completed steps and resumes from the last checkpoint.
- ✓The state machine enforces legal transitions: PENDING can move to RUNNING, RUNNING can move to PAUSED, COMPLETED, or FAILED, but COMPLETED can never move back to RUNNING.