Spark
The Swiss Army knife of distributed data processing, for better or worse
Use Cases
Architecture
Why It Exists
Anyone who worked with Hadoop MapReduce knows the pain. Every stage wrote intermediate results back to HDFS. Then the next stage read them from disk. Then wrote again. For iterative algorithms like ML training or PageRank, this meant waiting minutes per iteration just on I/O. It was brutal.
Spark came out of UC Berkeley's AMPLab with one core idea: keep intermediate data in memory between operations. That alone gave 10-100x speedups on iterative workloads. But the bigger win, the one that made Spark stick, is that it unified batch processing, SQL, streaming, ML, and graph computation under one API and one execution engine. Before Spark, separate systems were needed for each of those. Now there is no need. That simplification of the operational footprint is what actually matters in production.
How It Works Internally
Every Spark application starts with the Driver, which holds the application code and the SparkContext. When a DataFrame pipeline (or an RDD chain) is built, Spark constructs a lazy logical plan. Nothing executes yet. Computation only kicks off when an action like write, count, or collect is called. At that point, the Catalyst optimizer takes over.
Catalyst works in four phases. First, it resolves column names and types against the catalog (analysis). Second, it applies rule-based optimizations: pushing predicates down (filter before join), pruning unused columns, folding constants, and reordering joins based on table statistics. Third, it generates multiple physical plans and picks the cheapest one using a cost model. Fourth, Tungsten's whole-stage code generation compiles the physical plan into optimized Java bytecode. It fuses multiple operators (filter, project, aggregate) into a single tight loop that works directly on raw memory, skipping JVM object overhead entirely.
The DAGScheduler then converts the physical plan into a Directed Acyclic Graph of stages. Stages get split at shuffle boundaries, which are points where data has to move across the cluster (groupBy, join, repartition). Within a stage, tasks run as pipelined transformations on partitions. The TaskScheduler assigns tasks to executors, preferring data locality so data gets processed on the node where it already lives.
Shuffles are where Spark gets expensive. During a shuffle, each mapper sorts its output by reducer partition and writes it to local disk. Each reducer then pulls its partitions from every mapper over the network. The sort-based shuffle manager (the default) writes one file per mapper with an index file for fast seeking. For M mappers and R reducers, that is M files and M*R network fetches. This explains why many Spark jobs spend most of their time in shuffle stages.
The Block Manager on each executor splits memory into three pools: execution memory (for shuffles, joins, sorts), storage memory (for cached DataFrames), and user memory (for UDFs and metadata). Since Spark 1.6, unified memory management lets execution and storage borrow from each other dynamically, with execution getting priority. If Spark needs more room for a shuffle, it will evict cached data without asking.
Production Architecture
In production, Spark typically runs on YARN, Kubernetes, or Databricks. On Kubernetes, the driver runs as a pod that requests executor pods from the cluster. Dynamic allocation scales executors up and down based on how many tasks are waiting. This is not optional for shared clusters. Over-provisioning wastes money. Under-provisioning misses SLAs.
A typical production ETL cluster looks like: 50-200 executor pods, each with 4-8 cores, 16-32 GB memory, and local SSD for shuffle spill. The driver needs 4-8 GB for plan optimization and metadata. For Spark SQL workloads, the Hive Metastore acts as the catalog, with data living in Parquet or ORC on S3, HDFS, or Delta Lake.
Delta Lake (or Apache Iceberg) has become the standard storage layer for production Spark pipelines. It adds ACID transactions, schema evolution, and time travel on top of object storage. More practically, it solves the "small files problem" (too many tiny Parquet files killing read performance) through compaction, and the "concurrent write problem" through optimistic concurrency control. Running Spark on a data lake without one of these creates unnecessary operational pain.
For Structured Streaming, a checkpoint directory on HDFS or S3 is also needed to track offset progress and state. The micro-batch trigger interval controls latency. Setting it to 0 ("as fast as possible") processes each batch right after the previous one finishes, usually landing at 1-5 seconds end-to-end. If the latency requirement is tighter than that, Spark is the wrong tool. Use Flink.
Monitoring is not optional. The Spark Web UI provides stage-level DAG visualization, task metrics, and shuffle stats. For production, push metrics to Prometheus via the Spark metrics sink and watch these closely: executor.runTime, shuffle.write.bytesWritten, shuffle.read.fetchWaitTime, jvm.heap.used, and task.completionRate.
Decision Criteria
| Criteria | Spark | Flink | Presto/Trino | Hadoop MapReduce |
|---|---|---|---|---|
| Primary strength | Unified batch + ML + streaming | True streaming, event-time processing | Interactive SQL queries | Batch processing (legacy) |
| Processing model | Micro-batch (streaming), batch | True streaming (record-at-a-time) | MPP SQL engine | Batch (map-shuffle-reduce) |
| Latency | Seconds (streaming), minutes (batch) | Milliseconds | Seconds (query) | Minutes to hours |
| State management | Checkpoint to HDFS/S3 | Managed RocksDB + incremental checkpoints | Stateless (query-scoped) | HDFS between stages |
| ML support | MLlib (native), Spark ML Pipelines | FlinkML (limited) | None | Mahout (deprecated) |
| Language support | Scala, Python, Java, R, SQL | Java, Scala, Python (PyFlink), SQL | SQL (ANSI) | Java |
| Shuffle efficiency | Sort-based, spill to disk | Pipelined, hash-based | Pipelined, memory-first | Sort-based, always to disk |
| Ecosystem | Databricks, Delta Lake, MLflow | Ververica, AWS Kinesis Analytics | Starburst, AWS Athena | Hadoop ecosystem |
| Operational complexity | Medium (cluster tuning, shuffle config) | High (checkpoint tuning, RocksDB) | Low (stateless, auto-scaling) | High (HDFS, YARN, slow iteration) |
| Cost efficiency | Good (dynamic allocation, spot instances) | Good (per-job resource allocation) | Excellent (auto-scaling, pay-per-query) | Poor (always-on clusters) |
Capacity Planning
Executor sizing: Start with 4 cores and 16 GB per executor. For memory-heavy operations (big aggregations, wide joins), bump to 8 cores and 32 GB. Do not go above 32 GB per executor because GC pauses get ugly. Do not use single-core executors either, since broadcast overhead per JVM eats the gains. A rough formula for ETL: total_cores = data_size_GB / target_time_minutes * 2, assuming about 500 MB/core/minute throughput.
Shuffle partitions: Set spark.sql.shuffle.partitions to 2-3x the total core count for moderate data, or just enable AQE (spark.sql.adaptive.enabled=true) and let it auto-coalesce. For a 100 GB join on a 200-core cluster, 400-600 shuffle partitions is a reasonable starting point. Each partition should target 100-200 MB of data after the shuffle.
Memory allocation: Spark splits executor memory into JVM heap (spark.executor.memory), off-heap for Tungsten (spark.memory.offHeap.size), and overhead (spark.executor.memoryOverhead, default 10% or 384 MB). The unified memory pool takes 60% of heap by default and splits between execution and storage. For shuffle-heavy jobs, push spark.memory.fraction up to 0.75.
Storage and I/O: Batch ETL throughput is usually I/O-bound. Parquet on S3 reads at roughly 50-100 MB/sec per core (S3 API is the bottleneck). Parquet on HDFS reads at 200-400 MB/sec per core from local disk. Delta Lake adds 5-10% overhead for transaction log reads. Quick example: a 1 TB ETL job reading from S3 with 200 cores takes about 1 TB / (100 MB/sec * 200 cores) = ~50 seconds just for the read phase, plus compute and write time on top.
Structured Streaming: Each micro-batch reads new offsets from Kafka, processes them, writes output, and commits offsets. The processing time per micro-batch determines the sustained throughput. If a micro-batch handles 100,000 records in 2 seconds with a 2-second trigger interval, the sustained rate is 50,000 records/sec. If processing time exceeds the trigger interval, the pipeline is falling behind. Either add parallelism or optimize the pipeline.
Dynamic Allocation: Set spark.dynamicAllocation.minExecutors=10, maxExecutors=500, executorIdleTimeout=120s, schedulerBacklogTimeout=5s. This lets Spark scale from 10 to 500 executors within minutes based on the task backlog. On cloud infrastructure, this is how to avoid burning money on idle compute.
Failure Scenarios
Scenario 1: Shuffle Fetch Failure Cascade
Trigger: A node running executor processes dies. Hardware failure, OOM kill, spot instance getting reclaimed. All shuffle files written by that executor's mappers are gone because shuffle data sits on local disk with no replication.
Impact: Every reducer that needs data from the dead executor hits a FetchFailedException. Spark marks the shuffle output as lost and reruns the entire mapper stage to regenerate those files. But here is the problem: if the mapper stage depended on a previous stage's shuffle output, and those executors are also gone (very common during spot reclamation waves), the result is a cascade of stage recomputations rippling backward through the DAG. A job that was 90% done might need to redo 50% of its work. On a 1,000-executor cluster processing 10 TB, that can mean 30-60 extra minutes.
Detection: Watch for spark.shuffle.FetchFailedException in executor logs. Alert when stage.failedTasks exceeds 5% of total tasks. Track executor.lostCount for sudden spikes. The shuffle service metrics expose shuffle.read.retries.
Recovery: Turn on the external shuffle service (spark.shuffle.service.enabled=true). It runs as a persistent YARN or K8s daemon and serves shuffle files independently of executor lifetime, so executor death no longer means losing shuffle data. For spot instances, run mappers on on-demand nodes and reducers on spot, or use push-based shuffle (Spark 3.2+) which replicates shuffle blocks to a remote service. Set spark.stage.maxConsecutiveAttempts=4 for stage-level retries.
Scenario 2: Data Skew in Join Operation
Trigger: A job joins a 10 TB fact table against a dimension table, and the join key has extreme skew. Maybe 5% of records have a null key, or one popular key like "unknown" appears millions of times. One reducer gets handed 500 GB while the other 199 get 50 GB each.
Impact: 199 tasks finish in 5 minutes. The skewed task runs for 2 hours, and the entire stage waits for it. If that partition exceeds executor memory, it spills to disk repeatedly, making things even slower. GC pressure on the affected executor spikes, which can kill the executor and trigger shuffle fetch failures downstream. The SLA is now gone.
Detection: The Spark Web UI makes this obvious. Expect massive variance in task duration within a stage (5 minutes vs. 120 minutes). The "Shuffle Read Size" metric shows one task reading 10x more data than the median. AQE flags skewed partitions automatically via spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (default 256 MB).
Recovery: Enable AQE skew join handling: spark.sql.adaptive.enabled=true and spark.sql.adaptive.skewJoin.enabled=true. AQE spots skewed partitions at runtime and splits them into smaller sub-partitions, duplicating the matching dimension table partition for each split. For manual fixes: salt the skewed key (append a random suffix, then join against an exploded copy of the dimension table), handle null keys separately with a filter, or broadcast the smaller table if it fits in memory (spark.sql.autoBroadcastJoinThreshold=100m for up to 100 MB).
Scenario 3: Driver OOM During Large Aggregation Collect
Trigger: Someone calls .collect() or .toPandas() on a DataFrame that returns millions of rows. Or they force-broadcast a table bigger than the driver can hold. The driver's JVM heap fills up and dies.
Impact: The driver crashes with OutOfMemoryError. In client mode, that kills the entire application. All executors go down, all cached data vanishes, and any incomplete writes are left in a partial state (which can corrupt output unless Delta Lake or Iceberg is in use). In cluster mode, YARN or K8s can restart the driver, but there is no automatic recovery of computation progress. If the job ran for 3 hours before the OOM, those 3 hours are just gone. Everything starts over.
Detection: Monitor driver JVM heap usage through the metrics sink. Alert when driver.jvm.heap.used passes 80% of configured memory. Scan logs for java.lang.OutOfMemoryError. Set spark.driver.maxResultSize=2g (default 1g) so Spark fails fast with a clear error instead of silently OOMing.
Recovery: Do not call collect() on unbounded result sets. Use take(N), show(N), or write.save() instead. In PySpark, use .limit(N).toPandas() or enable Arrow-based transfer with spark.sql.execution.arrow.pyspark.enabled=true. Bump driver memory (spark.driver.memory=8g) for jobs that genuinely need large driver-side operations. Use spark.driver.maxResultSize as a safety net. For broadcast joins, let Spark auto-broadcast based on table statistics rather than forcing a broadcast on a table whose size has not been checked.
Pros
- • One API covers batch, streaming, SQL, and ML instead of stitching four systems together
- • In-memory processing makes iterative workloads dramatically faster than MapReduce ever was
- • Massive community means most problems already have a Stack Overflow answer
- • Pick your language: Scala, Python, Java, R, or SQL
- • Catalyst optimizer does genuinely smart things with your SQL queries
Cons
- • Micro-batch streaming adds seconds of latency, not milliseconds. If you need real-time, look elsewhere.
- • Memory hungry. Budget for it or watch your executors die.
- • Tuning Spark well is practically a full-time job
- • Overkill for anything that fits on a single machine
- • Shuffle operations will punish you if you are not careful
When to use
- • Large-scale batch data processing
- • You want one platform for batch, streaming, and ML instead of maintaining three
- • Interactive SQL queries on datasets too big for a single database
- • Your team already knows the JVM or Python ecosystem
When NOT to use
- • You need true sub-second stream processing (use Flink instead)
- • Your ETL fits comfortably on one machine. Just use pandas or DuckDB.
- • Real-time event processing where latency actually matters
- • OLTP workloads. Spark is not a database.
Key Points
- •The DAGScheduler breaks the logical plan into stages at shuffle boundaries. Minimizing shuffles is the single highest-impact optimization available.
- •Tungsten bypasses the JVM object model entirely, using unsafe off-heap memory and whole-stage code generation to get near-native CPU performance.
- •Adaptive Query Execution (AQE) in Spark 3.x changes the game: it adjusts shuffle partitions, join strategies, and skew handling at runtime using actual data stats instead of guesses.
- •Structured Streaming defaults to micro-batches (100ms minimum trigger interval). Continuous Processing mode exists for lower latency, but it still lacks exactly-once guarantees. That trade-off matters.
- •Data skew during shuffles means one task gets stuck processing way more data than the rest. Expect to see 99% of tasks finish fast while the job hangs on one straggler.
- •Nothing runs until an action is called (collect, write, count). Spark's lazy evaluation builds up the full DAG first, then executes everything at once.
Common Mistakes
- ✗Calling collect() on large datasets. This pulls every row to the driver and will OOM it. Use take(), show(), or write to storage.
- ✗Filtering down to 1% of the data and not repartitioning afterward. The result is hundreds of near-empty partitions burning scheduling overhead for nothing.
- ✗Leaving spark.sql.shuffle.partitions at the default 200 for every job. Tune it per job, or just turn on AQE and let Spark figure it out.
- ✗Writing Python UDFs in PySpark without Arrow optimization. Each row gets serialized between the JVM and Python, and it is roughly 100x slower than native Spark operations.
- ✗Caching DataFrames that are only read once. Caching eats memory and forces an extra full pass over the data. Only cache data that is actually reused.