Workflow Engine
Workflows are composite trees where each node is a command step. A state machine governs the lifecycle, memento checkpointing captures progress at step boundaries, and observers broadcast every transition. You get a pipeline engine that can nest, undo, pause, and recover from crashes.
Key Abstractions
Command interface with execute() and undo() methods. Every step in a workflow implements this contract.
Composite that contains child steps and executes them in order. Can nest other composites.
Composite that contains child steps and executes them concurrently using a thread pool.
Enum representing lifecycle states: PENDING, RUNNING, PAUSED, COMPLETED, FAILED.
Memento that captures the engine's progress at step boundaries for crash recovery.
Observer interface notified on every state change, step completion, or failure event.
Class Diagram
How It Works
A workflow engine models pipelines as a tree of steps. Leaf nodes are concrete actions: validate data, send an email, call an API. Internal nodes are composites that define execution strategy: sequential runs children one by one, parallel fans them out to a thread pool. The engine walks the tree, managing lifecycle state and capturing checkpoints as it goes.
The Composite pattern is the structural backbone. SequentialWorkflow and ParallelWorkflow implement the same WorkflowStep interface as leaf steps, so they nest inside each other to arbitrary depth. A data pipeline might be a sequential workflow whose second step is a parallel fan-out, whose third step is another sequential sub-pipeline. The engine doesn't need to know.
Each step is a Command with execute() and undo(). When a sequential workflow hits a failure at step N, it iterates backwards through steps N-1 to 0, calling undo() on each. Each step is responsible for cleaning up its own side effects, and the chain propagates the rollback without any step knowing about its siblings.
The engine maintains a State Machine with five states. Transitions are validated against a whitelist, so calling run() on a COMPLETED engine throws an error instead of silently corrupting state. After each top-level step completes, a checkpoint captures the current progress as a Memento. If the process crashes, a recovery routine can read the last checkpoint, identify which steps already ran, and resume from the next one.
Observers receive notifications on every state change, step completion, and failure. Logging, metrics, alerting, and UI updates all stay decoupled from the engine itself.
Requirements
Functional
- Define workflows as trees of steps: leaf steps for actions, composite steps for sequential or parallel execution
- Execute workflows with automatic rollback on failure (undo completed steps in reverse)
- Enforce lifecycle state transitions: PENDING, RUNNING, PAUSED, COMPLETED, FAILED
- Checkpoint progress after each step for crash recovery
- Notify listeners on state changes, step completions, and failures
Non-Functional
- Illegal state transitions must be rejected with clear errors
- Adding new step types must not require modifying the engine or existing composites
- Parallel steps must not block each other; one slow step should not stall the others
- Checkpoint storage should be replaceable (in-memory for testing, persistent for production)
Design Decisions
Couldn't we just use a flat step list instead of Composite?
A flat list forces every pipeline to be linear. Real workflows branch: validate, then fan out to three independent transforms, then converge for a final write. Composite lets you express this as a SequentialWorkflow containing a ParallelWorkflow. Adding a new execution strategy (e.g., ConditionalWorkflow that runs children based on a predicate) means implementing one interface, not restructuring the engine.
What if steps were just plain functions instead of Commands?
Plain functions have no undo(). When step 3 of 5 fails, you need to reverse steps 1 and 2. Command encapsulates both the forward action and its reversal. Each step knows how to clean up after itself, which is the only scalable approach when steps have side effects like database writes or API calls.
Does event replay work better than Memento for crash recovery?
Event sourcing rebuilds state by replaying every event from the beginning. For a 50-step workflow, that means re-executing 49 steps to recover at step 50. Memento captures a snapshot at step boundaries. Recovery reads the last snapshot and resumes from the next step. You trade storage for speed, which is the right trade-off when steps are expensive (HTTP calls, data transforms, email sends).
What about using boolean flags instead of a state machine?
Boolean flags (isRunning, isCompleted, isFailed) create a combinatorial explosion. What does isRunning=true and isFailed=true mean? A single enum with validated transitions eliminates impossible states. The transition map is the single source of truth for what the engine can do in any given state.
Interview Follow-ups
- "How would you distribute this across multiple machines?" Partition the workflow tree across workers. The engine becomes a coordinator that assigns sub-trees to workers via a task queue. Each worker checkpoints independently, and the coordinator merges results. Use distributed locks to prevent duplicate execution.
- "How would you handle compensation (sagas)?" Each step defines a compensate() method alongside undo(). Unlike undo which reverses a local operation, compensate issues a new forward action that semantically reverses the effect (e.g., issuing a refund instead of rolling back a payment). The engine runs compensations in reverse order on failure.
- "How do you ensure idempotency?" Assign a unique execution ID to each workflow run. Steps check whether their side effect already exists for that ID before executing. Combined with checkpointing, a recovered workflow never double-sends an email or double-writes a record.
- "How would you add conditional branching?" Create a ConditionalWorkflow composite that evaluates a predicate and delegates to one of two child steps. Because it implements WorkflowStep, it plugs into any position in the tree without changing the engine.
45-Minute LLD Playbook
Each phase is what the interviewer expects you to do and say. Concrete steps, not topic hints. Diagrams are what you would sketch on the board.
- 15 min
Clarify scope and lock the six concerns
GoalPin down the step contract, the composition shape, the rollback policy, the lifecycle, the checkpoint cadence, and the listener surface. End with the diagram-or-code question.
Do & Say- ASK·1Open with: A workflow is a tree of steps. Leaves are actions, internal nodes are sequential or parallel composites.
- SAY·2Lock v1 scope: In scope: execute, undo on failure, sequential and parallel composites, in-memory checkpoints per top-level step, lifecycle state machine, listeners. Park distributed execution, persistent checkpoint storage, saga compensation, and conditional branching as v2.
- SAY·3Lock the step contract: WorkflowStep has execute() returning StepResult(success, message), undo() returning void, and get_name() returning string. Every leaf and every composite implements this same interface. That is what makes the tree uniform.
- SAY·4Lock the rollback policy: SequentialWorkflow tracks _completed as it goes. On a step's failure, it calls undo() on each completed step in reverse order.
- SAY·5Cover parallel rollback: ParallelWorkflow waits for all futures, collects failures, and if any failed it rolls back the ones that succeeded. The engine itself never iterates; the composite owns its rollback.
- SAY·6Lock the lifecycle: Five states. PENDING -> RUNNING. RUNNING -> PAUSED, COMPLETED, or FAILED. PAUSED -> RUNNING or FAILED. COMPLETED is terminal. FAILED -> PENDING for full retry from scratch (re-running creates a new run, does not mutate the failed one). Write the transition map.
- SAY·7Lock the checkpoint cadence: After each top-level step completes, the engine appends a WorkflowCheckpoint capturing completed_steps list and state. On crash recovery, read the last checkpoint, skip what is already in completed_steps, resume from the next step. Storage is in-memory in v1, swappable to disk via interface in v2.
- ASK·8Ask the diagram-or-code question: I'd like to sketch WorkflowStep, the two composites, WorkflowEngine, and the state machine on the board. That's the core of the design. Or jump to code, your call.
Interviewer is grading: You name all six patterns by role before any code. You distinguish step.undo (per-step cleanup) from engine.recover (resume from checkpoint). You park sagas and distribution explicitly to keep v1 narrow.
- 25-10 min
Sketch the WorkflowStep contract and the engine plumbing
GoalWrite the WorkflowStep interface, the SequentialWorkflow rollback algorithm, the parallel concurrency model, and the engine's run loop. Either draw or verbalize.
Do & Say- WRITE·1Write the WorkflowStep interface: execute() -> StepResult, undo() -> void, get_name() -> str. Say: Three methods. execute does the work and returns success or failure with a message. undo cleans up if execute already ran. get_name is for logging and checkpointing.
- WRITE·2Write SequentialWorkflow.execute: for step in steps: result = step.execute(); if not result.success: self._rollback(); return StepResult(False, "Failed at ..."); self._completed.append(step). Return success. Show _rollback iterating reversed(_completed) and calling undo on each.
- WRITE·3Write ParallelWorkflow.execute: Submit all steps to a ThreadPoolExecutor, as_completed collects results. Track succeeded steps in _completed. If any failed, rollback the succeeded ones. Otherwise return success. Say: Parallel rollback is the trickiest case. If step A succeeded and step B failed concurrently, A still needs undoing.
- WRITE·4Write the transition map: PENDING -> {RUNNING}, RUNNING -> {PAUSED, COMPLETED, FAILED}, PAUSED -> {RUNNING, FAILED}, COMPLETED -> {} (terminal), FAILED -> {PENDING} (allow retry). Note: COMPLETED is the only terminal state; FAILED allows a retry by transitioning back to PENDING.
- WRITE·5Write WorkflowEngine.run: transition to RUNNING, execute_step on the root workflow, transition to COMPLETED on success or FAILED on failure. execute_step appends to completed_steps and checkpoints on success, fires on_step_failed on failure.
- SAY·6If a diagram was requested, draw it now: WorkflowStep interface at top with leaf classes (ValidateData, TransformData, SendEmail) and composites (Sequential, Parallel) below it. Composites' containment arrows loop back to WorkflowStep. WorkflowEngine to the side with arrows to WorkflowStep (root), WorkflowState, WorkflowCheckpoint, WorkflowListener.
Interviewer is grading: rollback iterates reversed(_completed), not the original step list. ParallelWorkflow tracks succeeded steps so it can roll them back when a sibling fails. The transition map is written cell-by-cell, with COMPLETED terminal and FAILED retryable. Checkpoint happens after each top-level success, not after every leaf.
- 325 min
Code in this sequence (bottom-up)
GoalType the code in pythonCode order. Talk through the rollback chain, the parallel failure handling, and the checkpoint placement as you write.
Do & Say- SAY·1Start with WorkflowState enum and VALID_TRANSITIONS dict at module level. Five states, transition rules from above. Say: COMPLETED has empty transitions, so any attempt to move from COMPLETED raises. That's the invariant. (~2 min)
- SAY·2Code WorkflowListener abstract base with on_state_change, on_step_completed, on_step_failed. Then ConsoleListener that prints each event with the right prefix. Say: Three event types. A metrics listener would update counters, an alerting listener would page on on_step_failed. (~3 min)
- SAY·3Code WorkflowCheckpoint dataclass: completed_steps list, state, timestamp via default_factory. __str__ formats it for the demo. Say: Memento. The engine creates one after each top-level step. On recovery, read the last one and resume. (~2 min)
- SAY·4Code StepResult dataclass: success bool, message default empty. (~1 min)
- SAY·5Code WorkflowStep abstract base: execute, undo, get_name, all abstract. (~1 min)
- SAY·6Code ValidateDataStep: _executed flag starts False. execute prints and flips _executed to True, returns success. undo prints the revert message if _executed is True. Say: Each leaf owns its own clean-up logic. The engine never knows what each step does internally. (~2 min)
- SAY·7Code TransformDataStep and SendEmailStep with the same shape: _executed flag, execute prints and flips, undo reverts. SendEmailStep's undo says queuing recall email to make the side-effect-reversal explicit. (~2 min)
- SAY·8Code FailingStep: execute prints Executing failing step and returns StepResult(False, Simulated failure). undo prints the cleanup message. Used in the demo to trigger the rollback path. (~1 min)
- SAY·9Code SequentialWorkflow constructor and execute. Takes name, steps list, _completed list. execute clears _completed, iterates steps, calls execute, on failure calls _rollback and returns Failed at name, else appends to _completed. (~2 min)
- SAY·10Code SequentialWorkflow._rollback and undo. _rollback iterates reversed(_completed) calling undo on each, then clears. undo() delegates to _rollback. (~1 min)
- SAY·11Say while coding Sequential: The composite implements the same interface as a leaf, so it nests. _rollback runs undo in reverse, which is the Chain of Responsibility part. (~2 min)
- SAY·12Code ParallelWorkflow constructor and execute. Constructor takes name and max_workers. execute uses ThreadPoolExecutor with that pool size and submits each step's execute. (~1 min)
- SAY·13Code ParallelWorkflow result handling. as_completed collects results into a dict and appends successful steps to _completed. After all futures resolve, if any failed, _rollback and return failure with the list; else return success. (~2 min)
- SAY·14Say while coding Parallel: We wait for ALL futures before deciding rollback. We don't cancel mid-flight; that would leak partial work in the canceled steps. (~1 min)
- SAY·15Code WorkflowEngine init and state plumbing. _state PENDING, _listeners list, _checkpoints list, _completed_steps list. add_listener appends. _transition validates against VALID_TRANSITIONS, raises IllegalTransitionError on illegal, else updates state and fires on_state_change. (~2 min)
- SAY·16Code WorkflowEngine checkpoint and run. _checkpoint appends a WorkflowCheckpoint snapshot. get_last_checkpoint returns the last or None. run(workflow) transitions to RUNNING, calls _execute_step(workflow), transitions to COMPLETED or FAILED based on result. (~2 min)
- SAY·17Code _execute_step and the error class. Calls step.execute, on success appends to completed_steps, calls _checkpoint, fires on_step_completed; on failure fires on_step_failed. Define IllegalTransitionError. Say: The engine wraps the root with state-machine and checkpointing. Inner composites manage their own rollback chain. (~1 min)
- SAY·18Walk through the happy path. Build MainPipeline as Sequential[ValidateData, Parallel[Transform, SendEmail]]. engine.run(main): _transition RUNNING, execute_step. Validate succeeds, Parallel succeeds, checkpoint, _transition COMPLETED. (~0.5 min)
- SAY·19Walk through illegal transition. Try engine._transition(RUNNING) after COMPLETED: COMPLETED -> RUNNING not in map, raises IllegalTransitionError. (~0.2 min)
- SAY·20Walk through the failing path. FailingPipeline with [Validate, Transform, FailingStep]. Validate and Transform succeed and append. FailingStep returns failure. _rollback iterates reversed, calling Transform.undo then Validate.undo. Engine transitions to FAILED. (~0.3 min)
Interviewer is grading: _rollback uses reversed(_completed), not the steps list. ParallelWorkflow waits for ALL futures before deciding rollback, no mid-flight cancellation. _checkpoint is called after success, not after every step (so failed steps don't leak into completed_steps). _transition validates against the map and fires listener events only on success.
- 45 min
Trade-offs, extensions, and wrap-up
GoalDefend Memento over event sourcing and Command over plain functions, volunteer sagas, conditional branching, and idempotency as the next extensions, summarize in one sentence.
Do & Say- SAY·1Trade-off one, Memento checkpoints over event-sourcing replay: Event sourcing replays every event from time zero. For a 50-step workflow that means re-executing 49 steps to recover at step 50. Catastrophic when steps are HTTP calls or DB writes.
- SAY·2Defend Memento: Memento captures a snapshot at step boundaries. Recovery reads the last snapshot and resumes. Trade is storage for speed, which is right when steps have real cost.
- SAY·3Trade-off two, Command over plain functions: Plain functions have no undo. When step 3 of 5 fails, you need to reverse steps 1 and 2. Command encapsulates both the forward action and the reversal in the same class.
- SAY·4Defend Command: Each step owns its rollback. That is the only thing that scales when steps have side effects like database writes, payments, and emails.
- SAY·5Extension to volunteer, sagas with compensate(): Add a compensate() method alongside undo(). undo reverses a local action (un-set the validated flag). compensate issues a new forward action that semantically reverses the effect (issue a refund instead of rolling back a payment).
- SAY·6Defend the saga split: For real distributed workflows, compensate is what you actually want, because you cannot un-charge a credit card.
- WATCH·7Be ready for the conditional-branching question: ConditionalWorkflow composite that evaluates a predicate and delegates to one of two child steps. Implements WorkflowStep, plugs anywhere in the tree without engine changes.
- WATCH·8Be ready for the idempotency question: Idempotency by execution_id. Each step checks whether its side effect already exists for that id before executing. Combined with checkpointing, recovery never double-sends.
- SAY·9Close with one sentence: Composite for the step tree. Command for execute/undo. Chain of Responsibility for the rollback chain. State machine on the engine lifecycle. Memento for checkpoint recovery. Observer for listener events. Six patterns over one recursive WorkflowStep interface.
Interviewer is grading: You defend Memento by quantifying replay cost concretely (49 re-executions for a 50-step recovery). You volunteer sagas with compensate as the obvious distributed-workflow extension. You can summarize all six patterns in one breath.
Code Implementation
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Callable
import copy
# --------------- State Pattern: workflow lifecycle ---------------
class WorkflowState(Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
PAUSED = "PAUSED"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
VALID_TRANSITIONS: dict[WorkflowState, set[WorkflowState]] = {
WorkflowState.PENDING: {WorkflowState.RUNNING},
WorkflowState.RUNNING: {WorkflowState.PAUSED, WorkflowState.COMPLETED, WorkflowState.FAILED},
WorkflowState.PAUSED: {WorkflowState.RUNNING, WorkflowState.FAILED},
WorkflowState.COMPLETED: set(),
WorkflowState.FAILED: {WorkflowState.PENDING}, # allow retry from scratch
}
# --------------- Observer Pattern: listeners ---------------
class WorkflowListener(ABC):
@abstractmethod
def on_state_change(self, old: WorkflowState, new: WorkflowState) -> None: ...
@abstractmethod
def on_step_completed(self, name: str) -> None: ...
@abstractmethod
def on_step_failed(self, name: str, error: str) -> None: ...
class ConsoleListener(WorkflowListener):
def on_state_change(self, old: WorkflowState, new: WorkflowState) -> None:
print(f" [State] {old.value} -> {new.value}")
def on_step_completed(self, name: str) -> None:
print(f" [Done] step '{name}' completed")
def on_step_failed(self, name: str, error: str) -> None:
print(f" [Fail] step '{name}': {error}")
# --------------- Memento Pattern: checkpoints ---------------
@dataclass
class WorkflowCheckpoint:
completed_steps: list[str]
state: WorkflowState
timestamp: datetime = field(default_factory=datetime.now)
def __str__(self) -> str:
steps = ", ".join(self.completed_steps) if self.completed_steps else "(none)"
return f"Checkpoint({self.state.value}, completed=[{steps}])"
# --------------- Command Pattern: step interface ---------------
@dataclass
class StepResult:
success: bool
message: str = ""
class WorkflowStep(ABC):
"""Command interface : every step can execute and undo."""
@abstractmethod
def execute(self) -> StepResult: ...
@abstractmethod
def undo(self) -> None: ...
@abstractmethod
def get_name(self) -> str: ...
# --------------- Concrete leaf steps ---------------
class ValidateDataStep(WorkflowStep):
def __init__(self) -> None:
self._executed = False
def get_name(self) -> str:
return "ValidateData"
def execute(self) -> StepResult:
print(f" > Validating data...")
self._executed = True
return StepResult(True, "Data valid")
def undo(self) -> None:
if self._executed:
print(f" > Undo: reverting validation flag")
self._executed = False
class TransformDataStep(WorkflowStep):
def __init__(self) -> None:
self._executed = False
def get_name(self) -> str:
return "TransformData"
def execute(self) -> StepResult:
print(f" > Transforming data...")
self._executed = True
return StepResult(True, "Data transformed")
def undo(self) -> None:
if self._executed:
print(f" > Undo: reverting transformation")
self._executed = False
class SendEmailStep(WorkflowStep):
def __init__(self) -> None:
self._executed = False
def get_name(self) -> str:
return "SendEmail"
def execute(self) -> StepResult:
print(f" > Sending notification email...")
self._executed = True
return StepResult(True, "Email sent")
def undo(self) -> None:
if self._executed:
print(f" > Undo: queuing recall email")
self._executed = False
class FailingStep(WorkflowStep):
"""Intentionally fails to demonstrate rollback."""
def get_name(self) -> str:
return "FailingStep"
def execute(self) -> StepResult:
print(f" > Executing failing step...")
return StepResult(False, "Simulated failure")
def undo(self) -> None:
print(f" > Undo: cleaning up failed step")
# --------------- Composite Pattern: sequential & parallel ---------------
class SequentialWorkflow(WorkflowStep):
"""Composite that runs children in order. Rolls back on failure."""
def __init__(self, name: str) -> None:
self._name = name
self._steps: list[WorkflowStep] = []
self._completed: list[WorkflowStep] = []
def get_name(self) -> str:
return self._name
def add_step(self, step: WorkflowStep) -> None:
self._steps.append(step)
def execute(self) -> StepResult:
self._completed.clear()
for step in self._steps:
result = step.execute()
if not result.success:
self._rollback()
return StepResult(False, f"Failed at '{step.get_name()}': {result.message}")
self._completed.append(step)
return StepResult(True, f"All {len(self._steps)} steps completed")
def _rollback(self) -> None:
"""Chain of Responsibility: undo completed steps in reverse order."""
for step in reversed(self._completed):
step.undo()
self._completed.clear()
def undo(self) -> None:
self._rollback()
class ParallelWorkflow(WorkflowStep):
"""Composite that runs children concurrently."""
def __init__(self, name: str, max_workers: int = 4) -> None:
self._name = name
self._steps: list[WorkflowStep] = []
self._completed: list[WorkflowStep] = []
self._max_workers = max_workers
def get_name(self) -> str:
return self._name
def add_step(self, step: WorkflowStep) -> None:
self._steps.append(step)
def execute(self) -> StepResult:
self._completed.clear()
results: dict[str, StepResult] = {}
with ThreadPoolExecutor(max_workers=self._max_workers) as pool:
futures = {pool.submit(s.execute): s for s in self._steps}
for future in as_completed(futures):
step = futures[future]
result = future.result()
results[step.get_name()] = result
if result.success:
self._completed.append(step)
failed = [n for n, r in results.items() if not r.success]
if failed:
self._rollback()
return StepResult(False, f"Parallel failures: {failed}")
return StepResult(True, f"All {len(self._steps)} parallel steps done")
def _rollback(self) -> None:
for step in reversed(self._completed):
step.undo()
self._completed.clear()
def undo(self) -> None:
self._rollback()
# --------------- Engine: state machine + checkpointing ---------------
class WorkflowEngine:
def __init__(self) -> None:
self._state = WorkflowState.PENDING
self._listeners: list[WorkflowListener] = []
self._checkpoints: list[WorkflowCheckpoint] = []
self._completed_steps: list[str] = []
def add_listener(self, listener: WorkflowListener) -> None:
self._listeners.append(listener)
def _transition(self, new_state: WorkflowState) -> None:
if new_state not in VALID_TRANSITIONS[self._state]:
raise IllegalTransitionError(
f"Cannot move from {self._state.value} to {new_state.value}"
)
old = self._state
self._state = new_state
for listener in self._listeners:
listener.on_state_change(old, new_state)
def _checkpoint(self) -> None:
cp = WorkflowCheckpoint(
completed_steps=list(self._completed_steps),
state=self._state,
)
self._checkpoints.append(cp)
def get_last_checkpoint(self) -> WorkflowCheckpoint | None:
return self._checkpoints[-1] if self._checkpoints else None
def run(self, workflow: WorkflowStep) -> bool:
self._transition(WorkflowState.RUNNING)
result = self._execute_step(workflow)
if result.success:
self._transition(WorkflowState.COMPLETED)
else:
self._transition(WorkflowState.FAILED)
return result.success
def _execute_step(self, step: WorkflowStep) -> StepResult:
result = step.execute()
if result.success:
self._completed_steps.append(step.get_name())
self._checkpoint()
for listener in self._listeners:
listener.on_step_completed(step.get_name())
else:
for listener in self._listeners:
listener.on_step_failed(step.get_name(), result.message)
return result
@property
def state(self) -> WorkflowState:
return self._state
class IllegalTransitionError(Exception):
pass
# --------------- Demo ---------------
if __name__ == "__main__":
print("=== Workflow Engine Demo ===\n")
# --- Build a nested workflow ---
print("1) Building nested workflow")
main_workflow = SequentialWorkflow("MainPipeline")
# First: validate
main_workflow.add_step(ValidateDataStep())
# Second: parallel sub-workflow (transform + notify concurrently)
parallel = ParallelWorkflow("ParallelProcessing")
parallel.add_step(TransformDataStep())
parallel.add_step(SendEmailStep())
main_workflow.add_step(parallel)
# --- Execute successfully ---
print("\n2) Running workflow (happy path)")
engine = WorkflowEngine()
engine.add_listener(ConsoleListener())
success = engine.run(main_workflow)
print(f" Result: {'SUCCESS' if success else 'FAILED'}")
print(f" Engine state: {engine.state.value}")
cp = engine.get_last_checkpoint()
print(f" Last checkpoint: {cp}")
# --- Demonstrate illegal transition ---
print("\n3) Attempting illegal transition COMPLETED -> RUNNING")
try:
engine._transition(WorkflowState.RUNNING)
except IllegalTransitionError as e:
print(f" Caught: {e}")
# --- Demonstrate failure + rollback ---
print("\n4) Running workflow with a failing step")
failing_workflow = SequentialWorkflow("FailingPipeline")
failing_workflow.add_step(ValidateDataStep())
failing_workflow.add_step(TransformDataStep())
failing_workflow.add_step(FailingStep())
failing_workflow.add_step(SendEmailStep())
engine2 = WorkflowEngine()
engine2.add_listener(ConsoleListener())
success2 = engine2.run(failing_workflow)
print(f" Result: {'SUCCESS' if success2 else 'FAILED (with rollback)'}")
print(f" Engine state: {engine2.state.value}")
# --- Demonstrate checkpoint recovery ---
print("\n5) Checkpoint-based recovery")
engine3 = WorkflowEngine()
engine3.add_listener(ConsoleListener())
recovery_workflow = SequentialWorkflow("RecoveryPipeline")
recovery_workflow.add_step(ValidateDataStep())
recovery_workflow.add_step(TransformDataStep())
engine3.run(recovery_workflow)
cp3 = engine3.get_last_checkpoint()
print(f" Checkpoint after run: {cp3}")
print(f" Completed steps in checkpoint: {cp3.completed_steps}")
print("\n=== All scenarios completed ===")Interview Grading by Level
What an interviewer at each level expects to see in your answer. Use this to calibrate, not to perform.
Junior Engineer (L3)
Names Composite and Command by name and can write a sequential pipeline, but rollback chains and state machine details stay shaky.
- Names Composite for the step tree and Command for execute/undo correctly.
- Writes a WorkflowStep interface with execute and undo.
- Implements at least one leaf step (ValidateData or TransformData).
- Builds a SequentialWorkflow that iterates and calls execute on each child.
- Adds a state machine when prompted, with at least PENDING, RUNNING, COMPLETED.
- Forgets the rollback iteration; on failure the workflow just returns without calling undo on completed steps.
- Iterates _completed in the wrong order (forward instead of reverse), corrupting the cleanup.
- Uses boolean flags (isRunning, isCompleted, isFailed) instead of an enum with validated transitions.
- Allows COMPLETED -> RUNNING because there's no transition map check.
- Has the engine call undo directly on each step instead of letting the composite handle its own rollback.
Mid-Level Engineer (L4)
Drives the design end-to-end with the rollback chain, the transition map, and per-top-level-step checkpointing.
- SequentialWorkflow._rollback iterates reversed(_completed) and calls undo on each.
- ParallelWorkflow uses ThreadPoolExecutor and waits for all futures before deciding rollback.
- Writes VALID_TRANSITIONS as a dict and validates every state change against it.
- WorkflowCheckpoint captures completed_steps and state, appended after each top-level success.
- WorkflowListener fires on_state_change, on_step_completed, on_step_failed at the right moments.
- Distinguishes step.undo (per-step cleanup) from engine recovery (resume from checkpoint).
- Does not volunteer sagas with compensate() as the next-step distributed-workflow extension unless prompted.
- Misses idempotency-by-execution-id as the way to make recovery safe with side-effect-heavy steps.
- Treats ConditionalWorkflow as a separate redesign rather than another WorkflowStep composite.
Senior Engineer (L5+)
Volunteers sagas, conditional branching, idempotency, and distributed coordination before being asked, names the checkpoint-cadence invariant, and frames each pattern as a defense against a specific failure.
- Volunteers sagas with compensate() as the right model when undo() can't reverse external effects (can't un-charge a credit card; issue a refund instead).
- Proposes ConditionalWorkflow as another WorkflowStep composite that evaluates a predicate and delegates, plugging into the tree without engine changes.
- Names idempotency-by-execution-id: each step checks for its side effect under that id before executing, so recovery never double-sends an email or double-writes a row.
- Quantifies the Memento-over-event-sourcing trade-off concretely: 49 re-executions for a 50-step recovery is catastrophic when steps are HTTP calls.
- Names the checkpoint cadence invariant: append AFTER success only, never on failure, so completed_steps is the authoritative recovery state.
- Frames each pattern around what it prevents: Composite prevents linear-only pipelines, Command prevents irreversible side effects, State machine prevents COMPLETED -> RUNNING corruption, Memento prevents 49-step replay, Observer prevents engine coupling to metrics and alerting.
- Proposes distributed execution as a coordinator that assigns sub-trees to workers via a task queue, with distributed locks to prevent double-execution and per-worker checkpoints merged at the coordinator.
- Closes with one sentence naming all six patterns and their roles in under 25 seconds.
Common Mistakes
- ✗Not handling partial failures in composite workflows. If step 3 of 5 fails, steps 1 and 2 must be rolled back or the system is left in an inconsistent state.
- ✗Missing checkpoints between steps. Without checkpointing, a crash after step 4 of 10 means re-running all 10 steps from scratch.
- ✗Allowing COMPLETED to RUNNING transition. Once a workflow finishes, re-running it should create a new execution instance, not mutate the completed one.
- ✗Coupling step logic to orchestration. Steps should not know about the engine, other steps, or execution order. They execute their own logic and nothing more.
Key Points
- ✓Composite pattern lets you nest arbitrarily. A SequentialWorkflow can contain ParallelWorkflows, which themselves contain leaf steps. The engine doesn't care about depth.
- ✓Command pattern makes every step reversible. On failure partway through a sequence, the engine calls undo() on each completed step in reverse order for clean rollback.
- ✓Memento checkpointing saves progress after each step completes. On crash recovery, the engine skips already-completed steps and resumes from the last checkpoint.
- ✓The state machine enforces legal transitions: PENDING can move to RUNNING, RUNNING can move to PAUSED, COMPLETED, or FAILED, but COMPLETED can never move back to RUNNING.