Apache Spark Questions
โ Overview ๐
Apache Spark is a unified analytics engine for large-scale data processing.
It provides high-level APIs in Java, Scala, Python, and R, and an optimized engine that supports general computation graphs for data analysis.
Apache Spark is an open-source, distributed processing system designed for big data workloads.
It functions as a unified analytics engine, meaning it can handle various types of data processing tasks within a single framework.
+------------------------+
| Driver Program |
| (SparkContext) |
+-----------+------------+
|
v
+-------------------+
| Cluster Manager |
+---------+---------+
|
-----------------------
| |
v v
+---------------+ +---------------+
| Worker Node | | Worker Node |
| +---------+ | | +---------+ |
| | Executor| | | | Executor| |
| +---------+ | | +---------+ |
| | Cache | | | | Cache | |
| +---------+ | | +---------+ |
| | Tasks | | | | Tasks | |
+---------------+ +---------------+
๐ท Core Components ๐
1๏ธโฃ Driver Program ๐
The Driver Program is the main control unit of a Spark application that orchestrates the entire execution process.
Responsibilities:
-
SparkContext Creation ๐ฏ :
Creates and manages the SparkContext which serves as the entry point for all Spark functionality.
Coordinates with cluster manager to acquire resources and maintains connection to executors throughout application lifecycle. -
Job Planning ๐ :
Converts user program into jobs by analyzing RDD lineage and creating directed acyclic graphs (DAG).
Breaks down complex operations into manageable stages based on data dependencies and shuffle boundaries. -
Task Scheduling โฐ :
Schedules tasks across executors based on data locality, resource availability, and workload distribution.
Optimizes task placement to minimize network overhead and maximize cluster utilization. -
Result Collection ๐ฆ :
Collects results from executors and aggregates them for final output or further processing.
Manages data transfer from distributed executors back to the driver for actions like collect() and reduce(). -
Fault Recovery ๐ก๏ธ :
Handles executor failures by tracking RDD lineage and recomputing lost partitions on healthy nodes.
Maintains resilience through automatic retry mechanisms and task rescheduling on alternative executors.
2๏ธโฃ Cluster Manager ๐ญ
The Cluster Manager is responsible for resource allocation and management across the cluster.
Overview:
Cluster managers handle resource negotiation, allocation, and monitoring across the distributed cluster.
They provide interfaces for Spark applications to request computing resources and manage the lifecycle of executors on worker nodes.
Types:
- Standalone Cluster Manager ๐๏ธ : Spark's built-in cluster manager for simple deployments
- YARN (Yet Another Resource Negotiator) ๐ : Hadoop's resource manager for enterprise environments
- Apache Mesos โ๏ธ : General-purpose cluster manager for diverse workloads
- Kubernetes โ๏ธ : Container orchestration platform for cloud-native deployments
3๏ธโฃ Worker Nodes ๐ทโโ๏ธ
Worker Nodes are the compute nodes in the cluster that host and run Spark executors.
-
Executor Hosting ๐ :
Provides compute resources (CPU cores and memory) to run executor JVM processes.
Manages multiple executors simultaneously and ensures proper resource isolation between different applications. -
Resource Management ๐ป :
Monitors and allocates system resources including CPU, memory, and disk space to executors.
Reports resource availability and health status back to the cluster manager for scheduling decisions.
4๏ธโฃ Executors ๐โโ๏ธ
Executors are JVM processes that run on worker nodes and execute individual tasks.
- Task Execution โก : Runs individual tasks assigned by the driver program and processes data partitions in parallel.
- Data Storage ๐พ : Caches RDD partitions in memory or disk based on storage levels and available resources.
- Shuffle Operations ๐ : Handles data exchange between stages during operations like joins, groupBy, and aggregations.
- Result Reporting ๐ : Reports task completion status, results, and metrics back to the driver program.
5๏ธโฃ SparkContext ๐ฏ
- RDD Creation ๐๏ธ : Creates RDDs from files, databases, and collections.
- Job Submission ๐ค : Converts RDD operations into executable tasks.
- Configuration Management โ๏ธ : Manages Spark configuration properties.
- Service Coordination ๐ค : Coordinates Spark services like block manager and web UI.
๐ Execution Flow
Job Execution Pipeline:
- Application Submission ๐ค : User submits Spark application via spark-submit.
- Resource Request ๐ค : Driver requests resources from cluster manager.
- Executor Launch ๐ : Cluster manager starts executors on worker nodes.
- Job Processing โก : Driver converts user code into jobs, stages, and tasks.
- Task Distribution ๐ก : Tasks distributed based on data locality.
- Result Collection ๐ : Executors return results to driver.
High-Level APIs ๐จ
1๏ธโฃ Spark SQL ๐
Provides structured data processing with SQL interface and DataFrame/Dataset APIs.
Includes Catalyst optimizer for query optimization and code generation for high-performance execution.
2๏ธโฃ Spark Streaming ๐
Enables scalable, fault-tolerant stream processing with micro-batch and continuous processing models.
Integrates with various streaming sources like Kafka, Flume, and TCP sockets.
3๏ธโฃ MLlib ๐ค
Machine learning library providing distributed algorithms for classification, regression, clustering, and collaborative filtering.
Includes ML pipelines for feature engineering and model selection workflows.
4๏ธโฃ GraphX ๐ธ๏ธ
Graph processing framework for distributed graph computation and analysis.
Provides graph algorithms like PageRank, connected components, and triangle counting on large-scale graphs.
๐น Transformation ๐งฉ
A transformation in Spark is a lazy operation that creates a new RDD or DataFrame without executing it immediately.
It only builds the execution plan (lineage) and waits until an action is called.
Examples ๐ : map(), filter(), flatMap(), select()
๐ธ Action ๐
An action is an operation that triggers the actual execution of all previously defined transformations
and returns the final result to the driver or writes it to external storage.
Examples ๐ : count(), show(), collect(), save()
๐ Key Difference
Transformations build the logical execution plan, while actions execute the plan and produce output.
Lazy Evaluation is a powerful optimization feature in Apache Spark where transformations on RDDs or DataFrames are not executed immediately when they are defined.
Instead, Spark builds a logical execution plan and only performs the actual computation when an action (like collect(), count(), show()) is triggered.
โ Benefits ๐ฏ
-
Performance Optimization โก :
Spark optimizes the logical and physical plans before execution to improve runtime efficiency. -
Avoids Unnecessary Computations โป๏ธ :
Only the required transformations are executed, saving CPU and memory resources. -
Pipeline Optimizations ๐ง :
Enables advanced optimizations like predicate pushdown and projection pruning.
๐ง Simple Insight
Spark first creates a plan, and only works when you ask for the final result.
RDD vs DataFrame vs Dataset in Apache Spark โก
๐น 1) RDD (Resilient Distributed Dataset)
๐ธ Definition :
An RDD is an immutable, distributed collection of datasets partitioned across the nodes of a cluster. It supports fault tolerance by enabling recomputation of lost partitions. RDDs are Sparkโs fundamental data structure and expose a functional programming API for distributed data processing.
๐ธ Core Concepts :
- Resilient ๐ : Can be recomputed using lineage if a node fails.
- Distributed ๐ : Split across a cluster to enable parallelism.
- In-Memory Computing โก : Stores intermediate results in memory for faster access.
- Flexible Creation ๐๏ธ : Created from HDFS, S3, or parallelized collections.
๐ธ Limitations โ :
- โ No Catalyst or Tungsten optimizer
- โ Not memory-efficient (only on-heap memory)
- โ No schema, no native SQL support
- โ High serialization overhead
- โ Affected heavily by Garbage Collection (GC)
๐น 2) DataFrame ๐
๐ธ Definition :
A DataFrame is a distributed collection of data organized into named columns (similar to a SQL table). It provides high-level APIs and is optimized using the Catalyst Optimizer and Tungsten Engine.
๐ธ Highlights โจ :
- Table-like structure (similar to SQL, Pandas, R DataFrames)
- Multi-language support (Scala, Python, Java, R)
- Scales from GBs to petabytes on clusters
- Built-in query and memory optimizations
๐ธ Limitations โ :
- โ Not type-safe (runtime errors)
- โ UDFs reduce optimization benefits
- โ Harder debugging for schema errors
๐น 3) Dataset ๐งฉ
๐ธ Definition :
Dataset is a strongly-typed distributed collection available in Scala/Java. It combines the benefits of RDDs (type safety) and DataFrames (optimizations). Spark uses Encoders for efficient serialization.
๐ธ Advantages โ :
- Type-safe with compile-time checks
- Uses Catalyst & Tungsten optimizations
- Efficient serialization using Encoders
- Supports both functional and SQL-style APIs
๐ธ Limitations โ :
- โ Not available in PySpark
- โ Complex for deeply nested schemas
- โ UDFs act as optimization barriers
๐น 4) Common Features (Across All Three) โ
- โ Fault-tolerant
- โ Distributed and parallelized
- โ Immutable (read-only)
- โ Supports lazy evaluation
- โ In-memory processing
- โ Internally processed as RDDs
๐น Key Differences โก
-
API Style ๐ง :
RDDs & Datasets use OOP/functional API, while DataFrames use SQL-style declarative API. -
How vs What ๐ค :
RDDs require specifying how to perform tasks; DataFrames/Datasets declare what to do. -
Memory Usage ๐งฎ :
RDDs use only on-heap memory, while DataFrames/Datasets use both on-heap and off-heap memory. -
Serialization ๐ฆ :
RDDs require heavy serialization; DataFrames/Datasets avoid much of it using optimized encoders. -
Garbage Collection ๐งน :
RDDs suffer from GC overhead, while DataFrames and Datasets significantly reduce GC impact.
โ Overview ๐
Both repartition() and coalesce() are Apache Spark operations used to change the number of partitions in a DataFrame or RDD, but they work differently and are optimized for different scenarios.
๐ท Key Differences ๐
| Aspect | Repartition | Coalesce |
|---|---|---|
| Shuffle | Always triggers full shuffle ๐ | Avoids shuffle when possible โ |
| Direction | Can increase or decrease partitions ๐ผ๐ฝ | Only decreases partitions ๐ฝ |
| Data Distribution | Even distribution across partitions โ๏ธ | May result in uneven partitions โ ๏ธ |
| Performance | Slower due to shuffle overhead ๐ | Faster, minimal data movement ๐ |
| Use Case | Need even distribution or more partitions | Efficiently reduce partitions |
| Network I/O | High network overhead ๐ก | Minimal network overhead ๐ |
๐ท When to Use Each ๐ก
โ Use Repartition When :
- Increasing partition count for better parallelism โก
- Need even data distribution across partitions โ๏ธ
- Preparing for downstream operations that benefit from uniform partitions
- Data is heavily skewed and needs redistribution ๐
Example ๐ป :
df = spark.read.parquet("input.parquet")
repartitioned_df = df.repartition(10) # 10 partitions with even distribution
repartitioned_by_col = df.repartition("customer_id") # Partition by column
โ Use Coalesce When :
- Reducing partition count to avoid small files ๐ฆ
- Minimizing shuffle overhead is a priority โ
- Final stage before writing to reduce output files ๐
- Data distribution is already acceptable โ๏ธ
Example ๐ป :
# Coalesce example coalesced_df = df.coalesce(5) # Reduce to 5 partitions, no shuffle
โ What is cache()? ๐
A shortcut to store a DataFrame in memory for faster reuse.
- Uses the default storage level: MEMORY_AND_DISK ๐พ
- If data fits in memory, it stays there; otherwise Spark spills to disk ๐ง โก๏ธ๐ฝ
- Ideal for frequently accessed data in iterative jobs ๐
โ What is persist()? ๐งฉ
Similar to cache(), but gives you full control over where and how the data is stored.
- You can choose different storage levels (memory, disk, serialized formats) โ๏ธ
- Useful when datasets are large or memory is limited ๐
๐น Storage Levels in persist() ๐พ
- MEMORY_ONLY โก: Fastest, but recomputes if memory is insufficient
- MEMORY_AND_DISK โ : Safe default, spills to disk when memory is full
- MEMORY_ONLY_SER ๐ง: Stores serialized (compressed) data in memory
- MEMORY_AND_DISK_SER ๐ฝ: Serialized memory + disk backup
- DISK_ONLY ๐ข: Stores only on disk (slower but safe)
- OFF_HEAP ๐ง : Uses external memory (advanced use case)
๐ข When to Use cache()? โ
- Quick caching without tuning โก
- Data fits in memory or disk fallback is acceptable ๐ก
- DataFrame reused multiple times in a pipeline ๐
๐ก When to Use persist()? ๐ฏ
- You need control over storage strategy โ๏ธ
- Large datasets that don't fit fully in memory ๐ฆ
- Memory-constrained environments to avoid OOM errors โ ๏ธ
- Performance tuning experiments ๐ฌ
๐ป Code Examples ๐
df = spark.read.csv("data.csv", header=True)
# Cache the DataFrame in memory (with disk fallback)
df.cache()
from pyspark import StorageLevel
df = spark.read.csv("data.csv", header=True)
# Persist using memory-only storage (wonโt use disk)
df.persist(StorageLevel.MEMORY_ONLY)
๐ท Accumulators
โ What is an Accumulator?
A write-only shared variable used to perform aggregations or counters across Spark tasks.
Useful for counting events like errors, nulls, or processed records.
Only the driver can read its value; executors can only update it.
๐ซ Important:
You should not use an accumulator to influence data transformation logic โ it's not guaranteed to update exactly once per record (especially with lazy evaluation and task retries).
\# Create an accumulatorerror_count = sc.accumulator(0)
---
๐ท Broadcast Variables
โ What is a Broadcast Variable?
A read-only shared variable cached on all executors.
Used to efficiently share large lookup data (e.g., dictionaries or reference tables) across all tasks.
Prevents sending a copy with each task, which saves network traffic and memory.
broadcast_var = sc.broadcast(lookup_dict)
โ What is Lineage?
Lineage is the history or recipe of how a DataFrame or RDD was created.
It's a logical chain of transformations applied to data (like map, filter, join, etc.).
Spark tracks lineage to recompute lost data if a partition fails (fault tolerance).
๐ง Example:
rdd1 = sc.textFile("file.txt") rdd2 = rdd1.map(lambda x: x.split()) rdd3 = rdd2.filter(lambda x: len(x) > 3) The lineage of rdd3 is: โ file.txt โ map() โ filter() ๐ Use:
Spark uses lineage info to recompute data without needing to persist/copy data. Essential for fault recovery.
---
๐ถ DAG (Directed Acyclic Graph)
โ What is a DAG?
A DAG is a graph of stages and tasks that Spark builds to execute your job.
It includes:
- All stages of transformations.
- Their execution order.
- Information about shuffles, dependencies, and task boundaries.
๐ง Example:
If your job has map โ groupBy โ count, Spark breaks it into stages and tasks with dependencies โ this forms a DAG.
โ 1) Client Mode ๐งโ๐ป
- Driver runs on your local machine (where
spark-submitis executed) ๐ป - Executors run on the cluster (YARN, Kubernetes, Standalone, etc.) ๐ฅ๏ธ
- Application logs and output are shown directly in your terminal ๐
- Used for: Development, notebooks, testing, and debugging ๐งช
โ 2) Cluster Mode ๐๏ธ
- Driver runs inside the cluster (e.g., YARN container or Kubernetes Pod) โ๏ธ
- You can disconnect after submission โ Spark job continues to run independently ๐
- Logs are stored in cluster management tools (YARN logs / Kubernetes pod logs) ๐ฆ
- Used for: Production workloads, scheduled jobs, and automation ๐ค
๐ Key Difference
In Client Mode, the driver lives on your machine. In Cluster Mode, the driver lives inside the cluster.
These modes define how and where Spark runs on different cluster managers.
โ 1) Standalone Mode ๐งฑ
What it is:
- Sparkโs built-in cluster manager โ๏ธ
- Does not require Hadoop or any external system ๐ซ๐
- You manually start a Master and multiple Worker nodes ๐ฅ๏ธ
When to use โ :
- Small to medium Spark clusters
- Simple setup without Hadoop or Kubernetes
โ 2) YARN Mode ๐
What it is:
- Spark runs on top of Hadoop YARN as the cluster manager ๐๏ธ
- Very common in enterprise Hadoop environments ๐ข
- Supports both client mode and cluster mode ๐
When to use โ :
- When working in a Hadoop ecosystem
- When Spark needs to share resources with Hive, HDFS, etc.
โ 3) Kubernetes Mode โธ๏ธ
What it is:
- Spark runs natively on a Kubernetes cluster โ๏ธ
- Uses Pods for Driver and Executors ๐ง
- Supports autoscaling and containerized execution ๐ฆ
When to use โ :
- Cloud-native infrastructure (AWS EKS, GCP GKE, AKS)
- Docker-based deployments with strong resource isolation ๐ณ
๐ Quick Summary
Standalone โ Simple, Spark-only cluster
YARN โ Best inside Hadoop ecosystem
Kubernetes โ Best for cloud-native, container-based setups
โ Quick Difference
๐น groupByKey: Groups all values for a key first, then aggregates (heavy shuffle).
๐น reduceByKey: Aggregates values during shuffle, reducing data early and improving efficiency.
โ 1) reduceByKey ๐
Syntax:
reduceByKey(func, [numTasks])
How it works:
- Performs local aggregation within each partition ๐งฎ
- Combines values before shuffle using a combiner โ๏ธ
- Only reduced results are shuffled across the network ๐ก
- Much less data transfer compared to groupByKey ๐
Best use case โ :
When you want sum, min, max, avg or other aggregations.
โ 2) groupByKey ๐งบ
Syntax:
groupByKey([numTasks])
How it works:
- No local reduction happens โ
- All raw values are shuffled across the network ๐ฆ
- Values are grouped only after shuffle on destination partition
- High memory usage on worker nodes ๐ฅ
- Can cause OutOfMemory (OOM) errors โ ๏ธ
Best use case โ :
When you want to retain all values as a list for each key.
โ Semantic Difference ๐ง
- groupByKey: Returns (key, Iterable[values]) โ full list of values ๐
- reduceByKey: Returns (key, reducedValue) โ single aggregated value ๐ฏ
โ Performance โฑ๏ธ
- groupByKey: More network traffic + more memory usage โ
- reduceByKey: Less shuffle + better performance โ
โ Example ๐งช
# Sample key-value pair RDD
data = [("A", 1), ("B", 2), ("A", 3), ("B", 4), ("C", 5)]
# Using groupByKey
grouped = rdd.groupByKey()
# Result: [("A", [1, 3]), ("B", [2, 4]), ("C", [5])]
# Using reduceByKey
reduced = rdd.reduceByKey(lambda x, y: x + y)
# Result: [("A", 4), ("B", 6), ("C", 5)]
โก Interview Tip:
Use reduceByKey for aggregation and performance.
Use groupByKey only when you truly need the full list of values.
โ What is Partitioning? ๐งฉ
Partitioning divides a table into separate directories based on column values. Each partition represents a subset of data stored in its own directory structure.
๐ Partitioning Structure
/warehouse/sales_table/
โโโ year=2023/month=01/
โ โโโ file1.parquet
โ โโโ file2.parquet
โโโ year=2023/month=02/
โ โโโ file1.parquet
โ โโโ file2.parquet
โโโ year=2024/month=01/
โโโ file1.parquet
โโโ file2.parquet
โ Advantages of Partitioning
- Partition Pruning ๐ โ Skips irrelevant partitions
- Parallel Processing โก โ Independent processing per partition
- Data Locality ๐ฆ โ Related data stays together
- Easy Data Management ๐ ๏ธ โ Add or drop partitions easily
โ Disadvantages of Partitioning
- Small File Problem ๐งจ โ High-cardinality columns create many small files
- Metadata Overhead ๐งพ โ Too many partitions increase metadata load
- Uneven Distribution โ๏ธ โ Skewed partition sizes
- Inefficient Queries ๐ โ Only effective when filtering on partition columns
โ What is Bucketing? ๐ชฃ
Bucketing distributes data into a fixed number of buckets using a hash function applied to specified columns.
๐๏ธ Bucketing Structure
/warehouse/sales_table/ โโโ bucket_00000.parquet โโโ bucket_00001.parquet โโโ bucket_00002.parquet โโโ bucket_00003.parquet โโโ ...
โ Advantages of Bucketing
- Fixed File Count ๐ โ Prevents small file problem
- Even Distribution โ๏ธ โ Hashing spreads data uniformly
- Join Optimization ๐ โ Enables bucket-map joins
- Efficient Sampling ๐ฏ โ Read specific buckets
- Reduced Shuffling ๐ โ Minimizes data movement
โ Disadvantages of Bucketing
- No Partition Pruning โ โ Cannot skip buckets
- Hash Collisions โ ๏ธ โ Can cause uneven distribution
- Bucket Count Choice ๐ฒ โ Requires good planning
- Fixed Structure ๐ โ Bucket count cannot change easily
โ Key Differences ๐
| Aspect | Partitioning | Bucketing |
|---|---|---|
| Data Organization | Directory-based | Hash-based buckets |
| File Count | Variable | Fixed |
| Query Pruning | Supported โ | Not Supported โ |
| Data Distribution | Can be skewed | Generally uniform |
| Small File Problem | Prone โ | Prevented โ |
| Join Performance | Standard joins | Optimized bucket joins ๐ |
| Metadata Overhead | High with many partitions | Low (fixed metadata) |
โ Bucket Count Guidelines ๐ (Important)
- Use powers of 2 or multiples of core count ๐งฎ
- Target file size of 128MB โ 1GB ๐ฆ
- Use same bucket count on join tables ๐
- Plan for data growth ๐
โ Conclusion ๐ง
Use Partitioning for time-series and filter-heavy queries.
Use Bucketing for high-cardinality joins and uniform distribution needs.
Use a Hybrid approach when you need both pruning and join optimization.
โ What is a Shuffle Hash Join?
A Shuffle Hash Join is a join strategy used in Spark when two large tables are joined and neither can fit entirely in memory (unlike Broadcast Join).
Spark shuffles both datasets across the cluster by the join key, then performs a hash-based join inside each partition.
๐ง How It Works (Step-by-Step)
- Shuffle ๐ : Both DataFrames are redistributed so that rows with the same join key land in the same partition.
- Build Phase ๐งฑ : Spark builds an in-memory hash table using the smaller dataset (if known).
- Probe Phase ๐ : Spark scans the larger dataset and looks up matches inside the hash table.
- Output โ : Matching rows are combined and returned.
๐ When is it Used?
- When Broadcast Join is not possible ๐ก
- When both tables are too large for memory ๐พ
- When Spark cannot determine a clearly smaller table ๐คทโโ๏ธ
๐ Shuffle Hash Join Process
Step 1: Shuffle Phase ๐
Node 1 receives: Orders: (101, order_data) Customers: (101, customer_data) Node 2 receives: Orders: (102, order_data) Customers: (102, customer_data) Node 3 receives: Orders: (103, order_data) Customers: (103, customer_data)
Step 2: Hash Join Phase ๐งฎ
Node 1:
Hash Table:
{101: ("Alice", "NYC")}
Probe:
Orders: (1, 101, 250) โ Match found โ
Result:
(1, 101, 250, "Alice", "NYC")
โ ๏ธ Trade-offs
| Pros โ | Cons โ |
|---|---|
| Works well for large tables | Expensive shuffle operation |
| Scales with cluster size | Requires high memory for hash map |
| Supports high parallelism | Performance suffers with skewed data |
๐ Real-World Notes
- Spark uses ShuffledHashJoin when one side is smaller, but not small enough to broadcast ๐ฆ
- Both datasets are shuffled, which makes this join expensive โ ๏ธ
โ How it works โ๏ธ
- Smaller table is broadcast to all executors ๐ฆ
- No shuffle is required for the larger table ๐ซ๐
- Each executor performs the join locally ๐ง
โ When is it used? โฑ๏ธ
- One table is smaller than spark.sql.autoBroadcastJoinThreshold (default ~10MB) ๐
- Spark automatically selects it for small dimension tables ๐ค
๐งช Example
-- Users: 5MB, Orders: 10GB SELECT * FROM orders o JOIN users u ON o.user_id = u.id;
๐ Process Flow
- 1๏ธโฃ Broadcast the Users table to all executors ๐ค
- 2๏ธโฃ Each executor builds a local hash table ๐งฑ
- 3๏ธโฃ Orders table stays partitioned (no shuffle) ๐ฆ
- 4๏ธโฃ Local join executed inside each executor โ
โ Pros โญ
- No shuffle required ๐
- Very fast for small dimension tables โก
- Perfect for star schema joins ๐
โ Cons โ ๏ธ
- Limited by broadcast size threshold ๐
- Consumes driver and executor memory ๐ง
โ How it works โ๏ธ
- Both tables are shuffled by the join key ๐
- Each partition is sorted by the join key ๐ค
- A merge-sort algorithm is used to join sorted partitions ๐
- This is the default join strategy when others donโt apply ๐ค
โ When is it used? โฑ๏ธ
- Both tables are large ๐
- Equi-joins with sortable keys ๐
- When broadcast and hash joins are not suitable ๐ซ
๐งช Example
-- Table A: 5GB, Table B: 8GB SELECT * FROM large_table_a a JOIN large_table_b b ON a.category = b.category;
๐ Process Flow
- 1๏ธโฃ Shuffle both tables by join key ๐
- 2๏ธโฃ Sort partitions by join key ๐ค
- 3๏ธโฃ Merge sorted partitions using two-pointer technique ๐งญ
- 4๏ธโฃ Stream through both datasets simultaneously ๐
โ Pros โญ
- Works with very large tables โ
- Memory efficient due to streaming nature ๐พ
- Handles duplicate keys well ๐
โ Cons โ ๏ธ
- Requires shuffle for both tables ๐
- Sorting overhead adds latency ๐ข
- Slower than hash-based joins โณ
โ Join Selection Logic ๐ง
Is smaller table < 10MB?
โโ โ
YES โ ๐ข Broadcast Join
โโ โ NO โ Is smaller table < 64MB and fits in memory?
โโ โ
YES โ ๐ Shuffle Hash Join
โโ โ NO โ ๐ Sort Merge Join
๐ Performance Comparison Table
| Join Type | Shuffle Required | Memory Usage | Best For |
|---|---|---|---|
| ๐ข Broadcast Join | One side only | High (broadcast) | Small dimension tables |
| ๐ Shuffle Hash Join | Both sides | Medium (hash table) | Medium tables with size difference |
| ๐ Sort Merge Join | Both sides | Low (streaming) | Large tables of similar size |
๐ฏ Key Interview Points
- ๐ข Broadcast Join is the fastest but limited by table size.
- ๐ Shuffle Hash Join is the middle-ground and requires enough executor memory.
- ๐ Sort Merge Join works in most cases but has sorting overhead.
- ๐ค Spark automatically selects join strategy based on table statistics.
- ๐งฉ You can force a strategy using hints:
/*+ BROADCAST(table) */
Dynamic Resource Allocation (DRA) allows Spark to automatically adjust the number of executors at runtime based on workload.
โ Why Use Dynamic Resource Allocation?
- ๐ Scales up executors when workload increases
- ๐ Scales down executors when they are idle
- ๐ฐ Optimizes overall cluster resource usage
๐ง How It Works
- Monitors number of pending tasks
- Tracks executor idle time
- Adds or removes executors automatically
- Works with YARN, Kubernetes, or Standalone cluster manager
โ๏ธ Key Configurations
spark.dynamicAllocation.enabled = true spark.dynamicAllocation.minExecutors = 2 spark.dynamicAllocation.initialExecutors = 4 spark.dynamicAllocation.maxExecutors = 10 spark.dynamicAllocation.executorIdleTimeout = 60s
๐งช Example spark-submit
spark-submit \ --conf spark.dynamicAllocation.enabled=true \ --conf spark.dynamicAllocation.minExecutors=2 \ --conf spark.dynamicAllocation.maxExecutors=8 \ --conf spark.dynamicAllocation.executorIdleTimeout=60 \ your_script.py
๐ Required Setting
spark.shuffle.service.enabled=true
โ Benefits
- ๐ธ Frees unused resources automatically
- โก Adapts to workload changes in real time
- ๐ค Removes need for manual executor tuning
โ ๏ธ Why Static Allocation Is Bad
- Wastes resources when executors stay idle
- Can cause slow jobs when executors are insufficient
- Cannot adapt to workload spikes
๐ฏ Interview Key Points
- Spark scales executors based on pending tasks
- Requires external shuffle service to work correctly
- Most useful for long-running and streaming jobs
โก Adaptive Query Execution (AQE) in Spark
Adaptive Query Execution (AQE) is a dynamic optimization feature in Spark SQL (Spark 3.x+) that changes the execution plan at runtime based on real data statistics.
๐ฏ Why AQE Is Needed
- Traditional Spark plans are built at compile time using limited statistics
- Real-world data contains skew, unknown sizes, and uneven distributions
- AQE adapts the plan during execution based on actual runtime metrics
โ๏ธ Key Features of AQE
- ๐ Dynamic Join Strategy Selection (can switch Sort-Merge โ Broadcast Join)
- โ๏ธ Skew Join Handling by splitting skewed partitions
- ๐ฆ Dynamic Shuffle Partition Coalescing to reduce small tasks
- โป๏ธ Local Shuffle Reader for efficient shuffle reuse
๐ Real-World Use Case
In a production workload joining ship activity with weather data, heavy port skew caused some tasks to run 10x slower. After enabling AQE and skew join handling, execution time improved by over 40% and stabilized job performance.
๐ How to Enable AQE
spark.sql.adaptive.enabled=true spark.sql.adaptive.coalescePartitions.enabled=true spark.sql.adaptive.skewJoin.enabled=true spark.sql.adaptive.localShuffleReader.enabled=true
๐ Configuration Explanation
spark.sql.adaptive.enabled โ Enables AQE spark.sql.adaptive.skewJoin.enabled โ Handles skewed joins spark.sql.adaptive.coalescePartitions.enabled โ Merges small shuffle partitions spark.sql.adaptive.localShuffleReader.enabled โ Improves shuffle reuse
โ ๏ธ When AQE May Not Help
- Cluster is CPU or disk I/O bottlenecked
- Data is already perfectly partitioned
- Manual hints may override AQE decisions
โ Best Practices
- Enable AQE by default for batch workloads
- Monitor decisions in Spark UI under "Adaptive Spark Plan"
- Combine with partitioning and Delta Lake Z-Ordering
๐ Summary
AQE helps Spark dynamically optimize execution at runtime, improving performance, cost efficiency, and reliability without heavy manual tuning.
Off-heap memory in Apache Spark refers to memory allocated outside the Java Virtual Machine (JVM) heap. This memory is manually managed by Spark and is not controlled by the Garbage Collector (GC), helping reduce GC overhead and improve performance.
๐ง 1. What is Off-Heap Memory?
- Allocated outside the JVM heap using native methods
- Not managed by the Java Garbage Collector
- Explicitly controlled and released by Spark
- Ideal for caching, shuffle buffers, and intermediate state management
โ 2. Why Use Off-Heap Memory?
- ๐๏ธ Reduced GC overhead for large datasets
- โก Faster binary serialization and direct memory access
- ๐ฆ Compact memory layouts with fixed-length storage
- ๐ Better memory utilization with low object overhead
- ๐ฅ Handles large memory buffers efficiently
๐ ๏ธ 3. How to Enable Off-Heap Memory
spark.memory.offHeap.enabled=true spark.memory.offHeap.size=4g
You can configure this via:
- spark-submit --conf
- spark-defaults.conf
- Notebook / Databricks session configs
โ๏ธ 4. Components That Use Off-Heap Memory
4.1 Tungsten Execution Engine ๐ฅ
- Uses off-heap for rows, sorts, and aggregations
- Uses UnsafeRow and UnsafeMemory
- Reduces object creation and GC load
4.2 Spark SQL Caching ๐พ
- Serialized caching can leverage off-heap storage
4.3 Broadcast Variables ๐ก
- Large broadcast tables can live off-heap
4.4 Shuffle & Sort Buffers ๐
- Uses off-heap during shuffle, spill, and external sorting
โ๏ธ 5. Heap vs Off-Heap Memory
Heap Memory โ JVM Garbage Collected, Object-Based, Higher GC cost Off-Heap Memory โ Manually Managed by Spark, Binary Format, Low GC cost
โ ๏ธ 6. Limitations & Considerations
- Manual memory release risks native memory leaks
- Harder to debug than heap-based memory
- Spark UI does not directly show off-heap usage
- Off-heap size must be pre-allocated
โ 7. Best Practices
- Tune off-heap size based on executor memory layout
- Enable compressed in-memory columnar storage
- Use OS-level tools like top, htop, jcmd for monitoring
- Combine with dynamic allocation and proper spill configs
๐ 8. Summary
Off-heap memory helps Spark avoid GC pauses, improve throughput, and handle large datasets efficiently. It is best suited for shuffle-heavy workloads, caching use cases, and large-scale Spark SQL processing when sized and monitored correctly.
๐น 1. What is SparkContext?
SparkContext is the primary entry point to Sparkโs core functionality in Apache Spark 1.x. It allows applications to connect to a cluster and perform distributed processing using RDDs (Resilient Distributed Datasets).
Key Responsibilities:
- Cluster Connection ๐ : Initializes connection to YARN, Mesos, or Standalone cluster manager
- Job Scheduling ๐๏ธ : Manages job scheduling and task distribution across executors
- RDD Creation ๐ฆ : Creates RDDs from files or in-memory collections
- Configuration Access โ๏ธ : Reads Spark configuration from SparkConf
- Hadoop Integration ๐ : Works with HDFS and Hadoop-compatible file systems
val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
val sc = new SparkContext(conf)
Limitations:
- โ No built-in SQL, Hive, or DataFrame support
- โ Requires separate SQLContext or HiveContext
๐น 2. What is SparkSession?
SparkSession, introduced in Spark 2.0, is the unified entry point for working with Spark SQL, RDDs, DataFrames, and Datasets. It merged all older contexts into a single, simplified API.
Key Benefits:
- Unified API Access ๐งฉ : Supports SQL, DataFrame, Dataset, and RDD APIs
- Simplified Initialization ๐งฑ : Uses builder pattern for cleaner configuration
- SQL & Hive Support ๐งฎ : Directly runs SQL and Hive queries
- Catalog Access ๐ : Query metadata, tables, and databases
- Internal SparkContext ๐ : Accessible via
spark.sparkContext
val spark = SparkSession.builder()
.appName("MyApp")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
๐น 3. Why was SparkSession Introduced?
Reasons:
- Unified API ๐ง : Combines SparkContext, SQLContext, and HiveContext
- Ease of Use โจ : Reduces setup complexity
- Better Integration ๐ : Built-in Spark SQL and Hive metastore support
- Future-Proof ๐ : Aligns with Sparkโs structured processing roadmap
- Cleaner Syntax ๐งผ : Fluent, readable configuration style
๐ Quick Summary
- SparkContext ๐๏ธ : Low-level engine mainly for RDD-based processing
- SparkSession ๐งฉ : Modern unified entry point for all Spark APIs
๐ Data Skewness in Apache Spark
โ What is Data Skew?
Data skew occurs when data is unevenly distributed across partitions โ meaning some partitions hold a much larger volume of data than others. This creates slow-running tasks and poor cluster efficiency.
โ ๏ธ Why It Matters
In Spark, wide operations like groupByKey, join, and reduceByKey involve shuffles.
When certain keys appear too frequently, one partition becomes overloaded,
while other executors stay idle โ leading to straggler tasks and stage delays.
๐ ๏ธ How to Fix Data Skew
| Strategy | Description |
|---|---|
| Salting ๐ง | Append random suffixes to skewed keys to distribute data across partitions |
| Broadcast Join ๐ก | Broadcast small tables to avoid expensive shuffles |
| Custom Partitioning โ๏ธ | Implement custom partitioners for better load balancing |
| Skew Join Handling (Spark 3+) ๐ค | Enable spark.sql.adaptive.skewJoin.enabled = true to auto-handle skew |
โ Best Practices
- Prefer
reduceByKeyoraggregateByKeyovergroupByKeyโก - Monitor Spark UI for long-running stages and straggler tasks ๐
- Enable AQE (Adaptive Query Execution) in Spark 3.x for automatic optimizations ๐
โ What is Salting?
Salting is a technique used to handle data skew in Apache Spark by adding randomness to highly frequent keys.
This spreads records more evenly across partitions during join, groupBy, and reduceByKey operations.
๐ง Salting for Aggregation (groupBy / reduceByKey)
โ Goal
Break โhot keysโ into multiple pseudo-keys using random salts so workload is distributed across executors.
๐ Steps
- Add a random salt column ๐ฒ
- Create a salted key by combining original key + salt ๐
- Aggregate using salted keys ๐
- Remove salt and re-aggregate to get final results โ
โ Code Example
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, concat, split
spark = SparkSession.builder.appName("SaltingExample").getOrCreate()
data = [("A", 100)] * 1000000 + [("B", 200)] * 1000
df = spark.createDataFrame(data, ["product", "amount"])
df_salted = df.withColumn("salt", (rand() * 10).cast("int"))
df_salted = df_salted.withColumn("product_salted", concat(col("product"), col("salt")))
intermediate = df_salted.groupBy("product_salted").sum("amount")
result = intermediate.withColumn("product", split(col("product_salted"), "_")[0]) \
.groupBy("product").sum("sum(amount)")
result.show()
๐ Salting for Join Operations
โ Goal
Replicate the small table with multiple salts so it matches salted keys of the large skewed table.
โ Join Salting Code Example
from pyspark.sql.functions import explode, array, lit
large = df.withColumn("salt", (rand() * 5).cast("int"))
large = large.withColumn("product_salted", concat(col("product"), col("salt")))
small_data = [("A", "Type1"), ("B", "Type2")]
small = spark.createDataFrame(small_data, ["product", "type"])
salt_range = array([lit(i) for i in range(5)])
small_expanded = small.withColumn("salt", explode(salt_range))
small_expanded = small_expanded.withColumn("product_salted", concat(col("product"), col("salt")))
joined = large.join(small_expanded, on="product_salted", how="inner")
๐ซ When Not to Use Salting
- If data skew is mild or temporary โ ๏ธ
- If AQE with
spark.sql.adaptive.skewJoin.enabled = trueis enabled ๐ค - If a Broadcast Join is possible ๐ก
๐ Spark Small File Problem
๐ What Are Small Files?
- Files smaller than the HDFS block size (typically 128MB)
- Spark processes each file as a separate task, so many small files โ many tasks โ high overhead
๐ What Counts as โSmallโ?
| File Size | Impact |
|---|---|
| < 1MB | Very severe performance issues โ |
| 1MB โ 32MB | Noticeable performance hit โ ๏ธ |
| 32MB โ 128MB | Acceptable โ |
| 128MB โ 1GB | Optimal for Spark ๐ |
โ ๏ธ Why Are Small Files a Problem?
- Task Overhead โฑ๏ธ
One task per file โ more scheduling time โ slower jobs - Metadata Overhead ๐ง
HDFS/Spark must track each file โ higher memory usage - Resource Underutilization โ๏ธ
Tiny tasks finish fast and waste CPU/memory
๐ฅ Common Causes
- Frequent writes with small DataFrames
- High-cardinality partition columns (e.g.,
user_id) - Micro-batch streaming jobs
๐งช Example Problem
# Bad practice โ creates small files
df = spark.read.json("tiny_input.json") # ~5MB
df.write.parquet("output/")
โ Solutions to the Small File Problem
1๏ธโฃ Coalesce or Repartition
# Combine small partitions into fewer large ones
df.coalesce(1).write.parquet("output/")
# Use repartition() when you want even distribution (causes shuffle)
2๏ธโฃ Set Target File Size
# Target ~128MB file size
spark.conf.set("spark.sql.files.maxPartitionBytes", 134217728)
3๏ธโฃ Use Efficient File Formats
df.write.format("parquet").save("output/")
- Avoid JSON/CSV for analytics workloads โ
- Prefer Parquet/ORC for performance โ
4๏ธโฃ Enable Compression
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")
5๏ธโฃ Smart Partitioning
- Avoid high-cardinality columns like
user_id,transaction_idโ - Use low-cardinality columns like
year,month,regionโ
๐ Best Practices Summary
- ๐ฏ Target file size: 128MB โ 1GB
- โ Minimum acceptable: 32MB
- ๐ซ Avoid very small files (<1MB)
- ๐ Use
coalesce(), Parquet, compression, and smart partitioning
Apache Spark is a powerful distributed data processing engine. However, poorly written Spark jobs can lead to inefficient resource utilization and long runtimes. This guide outlines detailed Spark optimization techniques categorized by areas such as memory, shuffles, joins, caching, code patterns, configurations, and storage layout.
1. Data Serialization & Memory Optimization
1.1 Use Efficient Serialization
- Kryo Serialization is faster and more compact than Java serialization.
- Set Kryo in configuration:
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired=true
1.2 Avoid Creating Large Objects in Driver
- Donโt use collect() or toPandas() on large datasets.
- Avoid building large maps/lists on the driver.
1.3 Use Broadcast Variables Wisely
- For small lookup tables, broadcast them to avoid large shuffles.
broadcastVar = sparkContext.broadcast(lookup_dict)
2. Shuffle Optimization
2.1 Prefer reduceByKey over groupByKey
- reduceByKey aggregates during the shuffle stage; groupByKey shuffles all data.
2.2 Tune Shuffle Partitions
- Set an optimal number of partitions:
spark.sql.shuffle.partitions = 200 # default is often too high
2.3 Use coalesce() to Reduce Partitions Before Output
- Avoid writing many small files:
df.coalesce(1).write.parquet(...)
3. Join Optimization
3.1 Use Broadcast Join for Small Tables
- Broadcast small dimension tables in star schema joins:
SELECT /*+ BROADCAST(smallTable) */ ...
3.2 Enable Adaptive Query Execution (AQE)
- Automatically optimizes join strategies and skew handling.
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
3.3 Avoid Skewed Joins
- Identify and handle skewed keys using salting or AQE.
4. Caching & Persistence
4.1 Cache Only When Necessary
- Use .persist(StorageLevel.MEMORY_AND_DISK) if data doesn't fit in memory.
- Always unpersist when caching is no longer needed.
4.2 Avoid Caching After Narrow Transformations
- Donโt cache intermediate RDDs unless reused multiple times.
5. Code-Level Optimizations
5.1 Avoid UDFs When Possible
- Use built-in Spark SQL functions; they are optimized and run natively.
5.2 Use Partition Pruning
- Filter early using partition columns to avoid unnecessary reads.
WHERE year = 2024 AND month = 6
5.3 Leverage Columnar Format
- Store data in Parquet or ORC for better I/O performance.
6. Configuration Tuning
6.1 Memory Settings
--executor-memory 4G
--executor-cores 4
--num-executors 10
6.2 Enable Dynamic Allocation
spark.dynamicAllocation.enabled=true
spark.shuffle.service.enabled=true
6.3 GC and JVM Tuning
- Monitor GC behavior via Spark UI.
- Use G1GC or CMS for better memory handling.
7. File and Storage Optimizations
7.1 Avoid Small Files Problem
- Merge files using coalesce() or repartition() before writing.
- Use automatic optimization in Delta Lake.
7.2 Use Delta Lake (in Databricks)
- Delta allows efficient updates, merges, and provides ACID compliance.
7.3 Optimize Table Layout
OPTIMIZE table_name ZORDER BY (column)
7.4 Enable Auto Optimize and Auto Compaction (Databricks)
ALTER TABLE my_table SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);
optimizeWrite ensures efficient file sizes at write time.
autoCompact runs background jobs to merge small files.
7.5 Manual OPTIMIZE and VACUUM
OPTIMIZE my_table;
VACUUM my_table RETAIN 168 HOURS;
8. Monitoring and Debugging
8.1 Use Spark UI
- Analyze DAGs, stages, and tasks to identify bottlenecks.
8.2 Use Logging and Metrics
- Log job progress.
- Integrate with tools like Ganglia, Prometheus, or Datadog.
8.3 Profile Jobs Using spark.eventLog.enabled
spark.eventLog.enabled=true
spark.eventLog.dir=/path/to/logs
โ What is Serialization?
Serialization in Spark is the process of converting objects into a byte-stream format so they can be:
โข Transferred across the network
โข Stored in memory or disk
โข Reconstructed later when needed
Since Spark is distributed, both data and code (closures) must move from the Driver to Executors.
๐ Where Spark Uses Serialization
| Component | Purpose |
|---|---|
| RDD / DataFrame Shuffle | Send intermediate data across executors |
| Broadcast Variables | Distribute large read-only variables efficiently |
| Accumulators | Track global metrics |
| Caching | Store intermediate results in serialized form |
| Closures | Send function logic from Driver to Executors |
๐ Built-in Spark Serializers
| Serializer | Description |
|---|---|
| Java Serializer | Default. Slower and creates large objects. |
| Kryo Serializer | Faster and more compact (requires class registration). |
โ๏ธ Enable Kryo (Recommended)
spark.serializer = org.apache.spark.serializer.KryoSerializer spark.kryo.registrationRequired = true
๐ฅ Why Serialization Matters for Performance
| Concern | Explanation |
|---|---|
| Speed โก | Java serialization is slow. Kryo can be 10xโ100x faster. |
| Size ๐ฆ | Serialized objects consume memory. Kryo produces smaller payloads. |
| GC Pressure ๐งน | Large serialized objects increase garbage collection overhead. |
| Shuffle & Disk IO ๐พ | Compact serialization speeds up shuffle and reduces disk spill. |
โ Interview Tip
Always mention that Kryo is preferred in production due to better performance and lower memory usage.
๐จ Apache Spark Out of Memory (OOM) Issues: Causes and Solutions
๐ What is an OOM Error in Spark?OOM (Out Of Memory) errors in Apache Spark happen when a task or executor exceeds the memory limit allocated to it, leading to failure. These issues can severely impact job performance or cause job failure.
๐ Root Causes of OOM in Spark
1๏ธโฃ Large Data Volume
Processing more data than available memory.
Joining large datasets without adequate partitioning.
2๏ธโฃ Skewed Data
Uneven distribution of data causes some partitions to be much larger than others.
Leads to few tasks requiring more memory than allocated.
3๏ธโฃ Improper Partitioning
Too few partitions โ large partitions โ memory pressure.
Too many partitions โ excessive task scheduling overhead.
4๏ธโฃ Large Shuffles
Wide transformations (e.g., groupByKey, join, reduceByKey) can cause massive shuffles requiring memory for sorting, buffering, and storing shuffle files.
5๏ธโฃ Improper Caching
Caching large datasets in memory without persist(StorageLevel.MEMORY_AND_DISK) fallback.
Repeated caching without unpersisting old data.
6๏ธโฃ Memory Configuration Mismanagement
Low executor/driver memory configuration.
Spark's memory management (storage vs execution memory) poorly tuned.
7๏ธโฃ Improper Use of Collect or Broadcast
collect() brings all data to the driver โ OOM on driver.
Broadcasting very large variables โ OOM on executors.
๐ ๏ธ Solutions and Best Practices
โ General Memory Management
spark.executor.memory โ Amount of memory allocated to each executor
spark.driver.memory โ Memory allocated to the driver
spark.memory.fraction โ Fraction of memory for execution and storage (default 0.6)
spark.memory.storageFraction โ Fraction of memory reserved for storage
Use Spark UI to monitor memory usage.
Increase executor and driver memory based on workload:
--executor-memory 8g --driver-memory 4gโ Data Skew Mitigation
Salting technique: Add random prefix to keys before shuffle operations.
Repartitioning: Use repartition() or coalesce() to balance data distribution.
Avoid groupByKey; prefer reduceByKey or aggregateByKey.
โ Optimize Shuffles
Tune these shuffle configurations:
spark.shuffle.compress=true spark.shuffle.spill.compress=true spark.shuffle.file.buffer=64k spark.reducer.maxSizeInFlight=48mโ Cache Carefully
Use .persist(StorageLevel.MEMORY_AND_DISK) instead of .cache() when unsure if data fits in memory.
Explicitly call .unpersist() when cached data is no longer needed.
โ Partitioning Strategy
Aim for ~100โ200 MB per partition for optimal performance.
Tune number of partitions:
spark.sql.shuffle.partitions=200โ Avoid Large Collect/Broadcast
Avoid using .collect() unless data is small enough to fit in driver memory.
Prefer take(n) for sampling.
Avoid broadcasting large datasets.
๐ง 1. Executor Memory OOM
Scenario: Spark throws java.lang.OutOfMemoryError: Java heap space
Where it happens: Inside the executors when tasks run out of heap memory.
Example:
val data = sc.textFile("large_file.csv")
val grouped = data.map(_.split(",")(0)).groupByKey().collect()
Fixes: Avoid groupByKey, use reduceByKey or aggregateByKey.
Increase executor memory: --executor-memory 8G
Repartition with smaller partitions: df.repartition(200)
Enable spill to disk (default is enabled for shuffles).
๐ง 2. Driver Memory OOM
Example:
val bigDF = spark.read.parquet("huge_table")
val localList = bigDF.collect()
Fixes: Avoid .collect(), prefer .show() or .take(10).
Increase driver memory: --driver-memory 4G
Use .foreach() instead of pulling data to driver.
๐ง 3. Shuffle Memory OOM
Example:
val df = bigDF1.join(bigDF2, "key")Fixes:
Enable adaptive query execution (AQE): spark.sql.adaptive.enabled=true
Use skew join handling: spark.sql.adaptive.skewJoin.enabled=true
Increase spark.shuffle.memoryFraction
Use salting or bucketing for skewed keys.
๐ง 4. Broadcast Join OOM
Example:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)
val result = bigDF.join(smallDF, Seq("id"))
Fixes: Reduce spark.sql.autoBroadcastJoinThreshold (e.g. 10MB).
Use hint("MERGE") to force shuffle join.
Partition both tables before join to avoid broadcasting.
๐ง 5. Caching OOM
Example:
val df = spark.read.parquet("huge_file")
df.cache()
df.count()
Fixes: Use persist(StorageLevel.MEMORY_AND_DISK)
Unpersist unused cached data: df.unpersist()
Compress in-memory storage: spark.rdd.compress=true
๐ง 6. Too Many Partitions (Task OOM)
Example:
val df = spark.read.json("s3://bucket/small_files/*")
df.repartition(10000)
Fixes: Use coalesce() or sensible repartition() count.
Use file compaction (autocompaction or OPTIMIZE in Delta).
Avoid explode on huge arrays without filtering.
๐น Spark Unified Memory
Spark Unified Memory
๐ Short understanding:
Spark Unified Memory is the pool used together by Storage & Execution memory. Earlier Spark had separate pools, but now they dynamically borrow from each other. This allows Spark to utilize memory efficiently and avoid OOM issues.
๐ Spark Memory Architecture Diagram
[ Executor ]
|
โโโReserved (300MB) โ fixed
|
โโโSpark Memory
|
โโโStorage (50%)
|
โโโExecution (50%)
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Executor โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ JVM Heap Memory โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Overhead Memory โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โ spark.executor.memory = 8GB โ
โ spark.executor.memoryOverhead=800MB
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โ (~300MB reserved)
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Reserved Memory โ
โโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Spark Memory โ <--- spark.sql.fraction = 0.6
โโโโโโโโโโโโโโโโโโโโโโโโโโค
โ User Memory โ
โโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โ (dynamic)
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Reserved Memory โ
โโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Storage Memory Pool โ <--- spark.sql.storageFraction=0.5
โโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Execution Memory Pool โ
โโโโโโโโโโโโโโโโโโโโโโโโโโค
โ User Memory โ
โโโโโโโโโโโโโโโโโโโโโโโโโโ
Size: (Executor Memory - Reserved Memory) ร spark.sql.fraction
Default: ~60% of available memory (spark.sql.fraction = 0.8)
Components:
๐ฆ Storage Memory
3a. Storage Memory
๐ Short understanding:
This memory is mainly used for caching datasets or keeping broadcast variables. It grows whenever there is free execution memory, and shrinks when execution needs space. If memory is needed urgently, storage blocks get evicted first.
Initial Size: Spark Memory ร spark.sql.storageFraction
Default: 50% of Spark Memory (spark.sql.storageFraction = 0.5)
Purpose:
Caching RDDs with .cache() or .persist()
Caching DataFrames and Datasets
Storing broadcast variables
Intermediate results from .checkpoint()
Eviction: Can be evicted when Execution Memory needs space
Spill Behavior: Cached data is either recomputed or read from disk based on storage level
๐ฅ Execution Memory
3b. Execution Memory
๐ Short understanding:
This is the โwork memoryโ Spark uses during transformations like joins, aggregates and shuffles. Execution memory has higher priority, so it cannot be evicted. When full, Spark spills data to disk instead of crashing with OOM.
Initial Size: Spark Memory ร (1 - spark.sql.storageFraction)
Default: 50% of Spark Memory
Purpose:
Shuffles: Hash tables and sort buffers during data shuffling
Joins: Hash tables for broadcast joins and sort-merge joins
Aggregations: Hash maps for groupBy operations and window functions
Sorting: Temporary space for orderBy operations
UDF Execution: Memory for complex user-defined functions
Eviction: Cannot be evicted (has higher priority than Storage)
Spill Behavior: When full, data spills to disk (controlled spilling)
(Controlled spilling is Spark's intelligent mechanism to handle situations when Execution Memory is full. Instead of crashing with an OutOfMemoryError, Spark gracefully moves some data from memory to disk, continues the operation, and reads the data back when needed.)
โญ Practical Example: 1GB Executor Memory Breakdown
๐ Short understanding:
This example helps visualize real numbers. Reserved memory is fixed, and the rest gets partitioned between User vs Spark Memory. Then spark memory is again split into Storage vs Execution based on fractions.
Let's walk through a complete example with 1GB executor memory:
Initial Memory Allocation
Total Executor Memory: 1GB (1024MB)
Step 1: Subtract Reserved Memory
โโโ Reserved Memory: 300MB (fixed)
โโโ Usable Memory: 724MB (1024MB - 300MB)
Step 2: Split Usable Memory
โโโ User Memory: 145MB (724MB ร 0.2)
โโโ Spark Memory: 579MB (724MB ร 0.8)
Step 3: Split Spark Memory
โโโ Storage Memory: 290MB (579MB ร 0.5) - initially
โโโ Execution Memory: 289MB (579MB ร 0.5) - initially
โญ Important Dynamic Memory Allocation
๐ Short understanding:
Memory is not rigidly partitioned; Spark keeps it flexible. Execution memory has higher priority and may evict cached blocks. This dynamic behavior gives Spark better execution reliability under memory pressure.
Storage and Execution memory can borrow from each other
Storage can borrow from Execution: When execution memory is unused
Execution can borrow from Storage: When storage memory is unused
Eviction Policy: Storage memory can be evicted if execution needs space, but not vice versa
spark.sql.fraction = 0.8 (default) // Fraction of Spark memory reserved for storage
spark.sql.storageFraction = 0.5 (default) // Memory fraction that must remain after eviction
spark.storage.storageFraction = 0.5 (default)
Overview
Tungsten is Apache Sparkโs physical execution engine, introduced in Spark 1.4 as part of Project Tungsten. It focuses on optimizing CPU and memory efficiency, enabling Spark to run faster and with reduced GC overhead. Tungsten operates on the physical plan generated by the Catalyst optimizer and is the core engine that executes the plan using low-level system optimizations.
๐ Quick Clarification:
โ Code generation (aka whole-stage codegen) is part of the Tungsten physical execution engineโnot Catalyst.
โ Catalyst does not do code generationโit stops at generating the physical plan.
Key Capabilities of Tungsten
โ Whole-Stage Code Generation
Dynamically generates Java bytecode at runtime to process multiple operations in a single function.
Eliminates function call overhead and intermediate row objects.
Improves CPU cache locality.
โ Binary Memory Layout (UnsafeRow)
Uses a compact binary format to represent rows and columns.
Reduces memory overhead and speeds up access.
Enables efficient serialization/deserialization and sorting.
โ Cache-Aware Computation
Algorithms are optimized to operate efficiently within CPU cache sizes.
Reduces memory shuffling and unnecessary access to slower memory.
โ Manual Memory Management
Spark gains control over how memory is allocated and released.
Operates on both on-heap and off-heap memory depending on configuration.
โ Tungsten operates with or without off-heap memory, but its performance is enhanced when off-heap is enabled.
By default, Tungsten uses on-heap memory.
Spill buffers refer to temporary in-memory data buffers that are used during expensive operations like:
Sorts
Aggregations
Shuffles
Joins
When the executor's memory runs low, Spark spills these buffers to disk to avoid out-of-memory errors.
When you submit a SQL query or use the DataFrame API, Spark doesn't just run it blindly. Instead, it goes through several intelligent steps under the Catalyst Optimizer to analyze, optimize, and execute your logic efficiently.
Let's walk through each stage, combining functionality and real behavior:
๐ซ 1. SQL Query / DataFrame
You write: SELECT name FROM employees WHERE age > 30 or a similar DataFrame query.
This is the starting point โ Spark simply takes your instructions as input.
๐ฉ 2. Unresolved Logical Plan
What happens:
Spark creates an initial "blueprint" of your query โ it knows the operations (select, filter), but hasn't confirmed if columns or tables exist.
'Project ['name]
+- 'Filter ('age > 30)
+- 'UnresolvedRelation [employees]
It's called unresolved because:
It hasn't looked up table schemas.
It doesn't know if employees or name actually exist.
Purpose: Act as an abstract representation of what you want, without verification.
๐จ 3. Logical Plan (After Analysis using Catalog)
What happens:
Spark checks the metadata catalog to resolve all columns, tables, and types.
It ensures everything exists and that operations like filtering and joining make sense.
Project [name#1] +- Filter (age#2 > 30) +- Relation employees[id#0, name#1, age#2]
Purpose:
Validates the query (semantic checks).
Resolves all references.
Applies type-checking.
This is where Spark throws errors if column/table names are wrong.
๐ท 4. Optimized Logical Plan
What happens: Spark applies rule-based transformations to improve performance through a Rule-Based Optimizer (RBO):
Key Rules Applied:
Filter pushdown - move WHERE clause as early as possible
Constant folding - compute static expressions in advance
Column pruning - drop unused columns
Join reordering and elimination - optimize join order and remove unnecessary joins
Boolean expression simplification - simplify AND/OR conditions
Null propagation - handle null values efficiently
Common subexpression elimination - avoid redundant calculations
Rule Engine:
Rules are applied iteratively until no more improvements possible
Each rule is a pattern-matching transformation
Rules can be custom-defined and plugged into Catalyst
Purpose: Improve efficiency without altering the final result. Still doesn't specify how to execute the plan.
๐ซ 5. Physical Plans
What happens:
Spark now generates multiple execution strategies.
It considers how best to perform joins, aggregations, scans, etc. depending on data size and layout.
Examples:
Should it use a broadcast join or a sort-merge join?
Should it cache intermediate data?
Purpose: Translate logical operations into concrete steps that can run on a cluster.
๐ฆ 6. Cost Model
What happens:
Spark estimates the cost of each physical plan:
Based on I/O, CPU, memory usage, shuffles, and partition sizes.
Uses table statistics and column histograms for better estimates.
It compares all options and chooses the lowest-cost plan.
Purpose: Select the most efficient execution path.
๐ช 7. Selected Physical Plan
What happens: Spark now has a final physical plan ready to run.
Includes exact execution instructions:
How to read data.
When to shuffle.
Partitioning schemes.
Specific join strategies.
Purpose: Provide a complete, optimized execution roadmap for the engine.
๐ฅ 8. Code Generation โ RDDs
What happens:
Spark uses Tungsten engine to generate low-level JVM bytecode (Whole-Stage Code Generation).
The physical plan is turned into actual operations on RDDs.
These RDDs are then executed across the distributed cluster.
Purpose: Achieve high performance execution.
Error Handling
How Catalyst handles optimization failures and fallbacks:
Analysis Phase Errors:
Unresolved references โ Clear error messages with suggestions
Type mismatches โ Detailed type conflict information
Schema validation failures โ Points to exact column/table issues
Optimization Fallbacks:
If code generation fails โ Falls back to interpreted execution
Complex expressions that can't be optimized โ Uses original form
Statistics missing โ Uses default cost estimates
Rule application failures โ Skips problematic rules, continues with others
Runtime Adaptability:
AQE monitors execution and re-optimizes if initial plan performs poorly
Graceful degradation when optimizations don't work as expected
Partition Pruning
Skips reading unnecessary partitions based on filter conditions.
Example:
-- Table partitioned by year and month SELECT * FROM sales WHERE year = 2024 AND month = 'January'
Only reads partitions matching year=2024/month=January
Avoids scanning other partition directories
Dramatically reduces I/O for large datasets
Constant Folding
Evaluates constant expressions at compile time instead of runtime.
Example:
SELECT price * (1 + 0.08),
substr('Hello World', 1, 5),
date_add('2024-01-01', 30)
FROM products
Optimized to:
SELECT price * 1.08,
'Hello',
'2024-01-31'
FROM products
1 + 0.08 becomes 1.08
String and date functions with constants get pre-computed
Eliminates redundant calculations per row
Predicate Pushdown
Moves filter conditions closer to the data source to reduce data movement.
Example:
SELECT name FROM (SELECT * FROM users JOIN orders ON users.id = orders.user_id) WHERE age > 25
Optimized to:
SELECT name FROM (SELECT * FROM users WHERE age > 25) u JOIN orders ON u.id = orders.user_id
Filter age > 25 applied before the expensive JOIN
Reduces rows processed in subsequent operations
Works with file formats (Parquet) and databases
Physical Plan Phase
What Happens: The physical planner takes the optimized logical plan and generates one or more physical execution strategies, then selects the best one based on cost estimates.
Key Transformations:
Operator Mapping:
Logical Join becomes SortMergeJoin, BroadcastHashJoin, or ShuffledHashJoin
Logical Aggregate becomes HashAggregate or SortAggregate
Logical Project becomes Project with codegen-ready expressions
Physical Strategies:
Cost-based selection: Chooses join algorithms based on table sizes and statistics
Partitioning decisions: Determines shuffle requirements and partition counts
Memory management: Allocates memory for operations like hash tables and sort buffers
Execution Planning:
Determines which operations can be pipelined together
Identifies shuffle boundaries (stage breaks)
Plans resource allocation and task distribution
Code Generation Prep:
Expressions get prepared for whole-stage code generation
Complex expression trees become Java code snippets
Operators get grouped into stages for codegen
Output: The result is an executable SparkPlan tree with concrete operators, memory allocations, and codegen-ready components that can be submitted to the cluster for execution.
Catalyst enables Tungsten optimizations during the physical planning phase.
How Catalyst Enables Tungsten
Code Generation Phase:
Catalyst's physical planner prepares expressions for Tungsten's code generation
Converts expression trees into codegen-ready formats
Enables whole-stage code generation by identifying fusible operators
Code generation (codegen) in Spark is a performance optimization that generates optimized Java bytecode at runtime instead of using interpreted execution.
Tungsten's Code Generation
What Tungsten Generates:
Java source code at runtime that gets compiled to bytecode
Whole-stage codegen - fuses multiple operators into single functions
Expression codegen - optimized evaluation of SQL expressions
Specific Integration Points:
Expression Evaluation:
Catalyst optimizes expressions logically
Passes optimized expressions to Tungsten for code generation
Tungsten generates efficient bytecode for those expressions
Operator Selection:
Catalyst chooses logical operators (Join, Aggregate)
Physical planner selects Tungsten-optimized implementations
Examples: UnsafeShuffleWriter, TungstenAggregate
Memory Management:
Catalyst plans memory usage patterns
Tungsten provides the actual off-heap memory management
Uses binary formats determined during physical planning
๐ File Formats in Spark โ Complete Notes
(Parquet โข ORC โข Avro โข JSON โข CSV โข Delta โข Text)
๐ถ 1. CSV (Comma Separated Values)
๐ What it is
Plain text tabular format where values are separated using comma.
โ Advantages
Human readable
Universally supported
Easy ingestion
โ Disadvantages
No schema
No compression by default
Slow scanning
Heavy disk I/O
Spark impact
Slow query performance
High shuffle
Needs .option("header") frequently
Schema inference required
Use when
Interchange format
Small static files
System integration
๐ถ 2. JSON (JavaScript Object Notation)
๐ What it is
Semi-structured format storing data as key-value & nested objects.
โ Advantages
Schemaless
Supports nested structures
Human readable
โ Disadvantages
Larger size
Slow parsing
No indexing
Spark impact
slower scan
schema inference needed
expensive processing for nested fields
Use when
APIs
Logs
Streaming
๐ถ 3. Parquet (Columnar)
๐ What it is
Columnar storage format highly optimized for analytics.
โ Advantages
Compression
Predicate pushdown
Column pruning
Fast read
Optimized for Spark
โ Disadvantages
Insert slow
Update slow
Overhead for small rows
Spark benefits
fast scan
low shuffle
optimized for big data
best for transformation
Use when
Data lake
analytical workloads
๐ถ 4. ORC (Optimized Row Columnar)
๐ What
Similar to Parquet but optimized originally for Hive.
โ Advantages
Best compression
Good query optimization in Hive
Indexing + dictionary encoding
โ Disadvantages
Heavy metadata
Slightly slower in Spark than Parquet (Spark optimized for Parquet)
Spark impact
very good compression
better for Hive workloads
Use when
Hive heavy systems
Hadoop environment
๐ถ 5. Avro (Row Oriented)
๐ What
Row-based binary format with schema stored along with data.
โ Advantages
Schema evolution
Row based writing
Compact
Good for messaging
โ Disadvantages
Not optimized for analytics
Slow column scanning
Spark use
Kafka + Streaming
CDC data
schema evolution use cases
Use when
streaming pipelines
row-based messaging
๐ถ 6. Delta Lake
๐ What it is
Storage + transaction layer on top of parquet.
โ Advantages
ACID transactions
Time travel
MERGE / UPDATE / DELETE
Streaming + batch unified
Schema evolution
โ Disadvantages
Proprietary (open source, but Databricks led)
Slight storage overhead
Spark impact
Fast
Highly reliable
Supports ACID
Lakehouse recommended format
Use when
Data lakehouse
UPSERTS
Slowly changing dimensions
Streaming + batch
๐ถ 7. TEXT
โ Advantages
simplest format
widely supported
โ Disadvantages
no schema
no compression
poor performance
Spark use
quick testing
minimal ingestion
logs
โก Spark Performance Comparison
Format Storage Read Compression Analytics Schema Evolution CSV poor slow none bad none JSON medium slow medium bad medium Avro medium fast row medium medium best Parquet best fast column best best good ORC best fast best best good Delta best
