Apache Spark Questions

โœ… Overview ๐Ÿ“‹

Apache Spark is a unified analytics engine for large-scale data processing.

It provides high-level APIs in Java, Scala, Python, and R, and an optimized engine that supports general computation graphs for data analysis.

Apache Spark is an open-source, distributed processing system designed for big data workloads.

It functions as a unified analytics engine, meaning it can handle various types of data processing tasks within a single framework.


   +------------------------+
   |      Driver Program    |
   |      (SparkContext)    |
   +-----------+------------+
               |
               v
       +-------------------+
       |  Cluster Manager  |
       +---------+---------+
                 |
        -----------------------
        |                     |
        v                     v
 +---------------+     +---------------+
 |   Worker Node |     |   Worker Node |
 |   +---------+ |     |   +---------+ |
 |   | Executor| |     |   | Executor| |
 |   +---------+ |     |   +---------+ |
 |   |  Cache  | |     |   |  Cache  | |
 |   +---------+ |     |   +---------+ |
 |   |  Tasks  | |     |   |  Tasks  | |
 +---------------+     +---------------+


๐Ÿ”ท Core Components ๐Ÿ”


1๏ธโƒฃ Driver Program ๐Ÿš—

The Driver Program is the main control unit of a Spark application that orchestrates the entire execution process.

     Responsibilities:

  • SparkContext Creation ๐ŸŽฏ :
           Creates and manages the SparkContext which serves as the entry point for all Spark functionality.
           Coordinates with cluster manager to acquire resources and maintains connection to executors throughout application lifecycle.
  • Job Planning ๐Ÿ“‹ :
           Converts user program into jobs by analyzing RDD lineage and creating directed acyclic graphs (DAG).
           Breaks down complex operations into manageable stages based on data dependencies and shuffle boundaries.
  • Task Scheduling โฐ :
           Schedules tasks across executors based on data locality, resource availability, and workload distribution.
           Optimizes task placement to minimize network overhead and maximize cluster utilization.
  • Result Collection ๐Ÿ“ฆ :
           Collects results from executors and aggregates them for final output or further processing.
           Manages data transfer from distributed executors back to the driver for actions like collect() and reduce().
  • Fault Recovery ๐Ÿ›ก๏ธ :
           Handles executor failures by tracking RDD lineage and recomputing lost partitions on healthy nodes.
           Maintains resilience through automatic retry mechanisms and task rescheduling on alternative executors.

2๏ธโƒฃ Cluster Manager ๐Ÿญ

The Cluster Manager is responsible for resource allocation and management across the cluster.

     Overview:

Cluster managers handle resource negotiation, allocation, and monitoring across the distributed cluster.
They provide interfaces for Spark applications to request computing resources and manage the lifecycle of executors on worker nodes.

     Types:

  • Standalone Cluster Manager ๐Ÿ—๏ธ : Spark's built-in cluster manager for simple deployments
  • YARN (Yet Another Resource Negotiator) ๐Ÿ˜ : Hadoop's resource manager for enterprise environments
  • Apache Mesos โš–๏ธ : General-purpose cluster manager for diverse workloads
  • Kubernetes โ˜๏ธ : Container orchestration platform for cloud-native deployments

3๏ธโƒฃ Worker Nodes ๐Ÿ‘ทโ€โ™‚๏ธ

Worker Nodes are the compute nodes in the cluster that host and run Spark executors.

  • Executor Hosting ๐Ÿ  :
           Provides compute resources (CPU cores and memory) to run executor JVM processes.
           Manages multiple executors simultaneously and ensures proper resource isolation between different applications.
  • Resource Management ๐Ÿ’ป :
           Monitors and allocates system resources including CPU, memory, and disk space to executors.
           Reports resource availability and health status back to the cluster manager for scheduling decisions.

4๏ธโƒฃ Executors ๐Ÿƒโ€โ™‚๏ธ

Executors are JVM processes that run on worker nodes and execute individual tasks.

  • Task Execution โšก : Runs individual tasks assigned by the driver program and processes data partitions in parallel.
  • Data Storage ๐Ÿ’พ : Caches RDD partitions in memory or disk based on storage levels and available resources.
  • Shuffle Operations ๐Ÿ”€ : Handles data exchange between stages during operations like joins, groupBy, and aggregations.
  • Result Reporting ๐Ÿ“Š : Reports task completion status, results, and metrics back to the driver program.

5๏ธโƒฃ SparkContext ๐ŸŽฏ

  • RDD Creation ๐Ÿ—‚๏ธ : Creates RDDs from files, databases, and collections.
  • Job Submission ๐Ÿ“ค : Converts RDD operations into executable tasks.
  • Configuration Management โš™๏ธ : Manages Spark configuration properties.
  • Service Coordination ๐Ÿค : Coordinates Spark services like block manager and web UI.

๐Ÿ”„ Execution Flow

     Job Execution Pipeline:

  • Application Submission ๐Ÿ“ค : User submits Spark application via spark-submit.
  • Resource Request ๐Ÿค : Driver requests resources from cluster manager.
  • Executor Launch ๐Ÿš€ : Cluster manager starts executors on worker nodes.
  • Job Processing โšก : Driver converts user code into jobs, stages, and tasks.
  • Task Distribution ๐Ÿ“ก : Tasks distributed based on data locality.
  • Result Collection ๐Ÿ“Š : Executors return results to driver.

High-Level APIs ๐ŸŽจ


1๏ธโƒฃ Spark SQL ๐Ÿ“Š

Provides structured data processing with SQL interface and DataFrame/Dataset APIs.
Includes Catalyst optimizer for query optimization and code generation for high-performance execution.


2๏ธโƒฃ Spark Streaming ๐ŸŒŠ

Enables scalable, fault-tolerant stream processing with micro-batch and continuous processing models.
Integrates with various streaming sources like Kafka, Flume, and TCP sockets.


3๏ธโƒฃ MLlib ๐Ÿค–

Machine learning library providing distributed algorithms for classification, regression, clustering, and collaborative filtering.
Includes ML pipelines for feature engineering and model selection workflows.


4๏ธโƒฃ GraphX ๐Ÿ•ธ๏ธ

Graph processing framework for distributed graph computation and analysis.
Provides graph algorithms like PageRank, connected components, and triangle counting on large-scale graphs.



๐Ÿ”น Transformation ๐Ÿงฉ

A transformation in Spark is a lazy operation that creates a new RDD or DataFrame without executing it immediately.
It only builds the execution plan (lineage) and waits until an action is called.

Examples ๐Ÿ“Œ : map(), filter(), flatMap(), select()


๐Ÿ”ธ Action ๐Ÿš€

An action is an operation that triggers the actual execution of all previously defined transformations
and returns the final result to the driver or writes it to external storage.

Examples ๐Ÿ“Œ : count(), show(), collect(), save()


๐Ÿ” Key Difference

Transformations build the logical execution plan, while actions execute the plan and produce output.

Lazy Evaluation is a powerful optimization feature in Apache Spark where transformations on RDDs or DataFrames are not executed immediately when they are defined. Instead, Spark builds a logical execution plan and only performs the actual computation when an action (like collect(), count(), show()) is triggered.


โœ… Benefits ๐ŸŽฏ

  • Performance Optimization โšก :
           Spark optimizes the logical and physical plans before execution to improve runtime efficiency.
  • Avoids Unnecessary Computations โ™ป๏ธ :
           Only the required transformations are executed, saving CPU and memory resources.
  • Pipeline Optimizations ๐Ÿ”ง :
           Enables advanced optimizations like predicate pushdown and projection pruning.

๐Ÿง  Simple Insight

Spark first creates a plan, and only works when you ask for the final result.

RDD vs DataFrame vs Dataset in Apache Spark โšก


๐Ÿ”น 1) RDD (Resilient Distributed Dataset)

     ๐Ÿ”ธ Definition :

An RDD is an immutable, distributed collection of datasets partitioned across the nodes of a cluster. It supports fault tolerance by enabling recomputation of lost partitions. RDDs are Sparkโ€™s fundamental data structure and expose a functional programming API for distributed data processing.

     ๐Ÿ”ธ Core Concepts :

  • Resilient ๐Ÿ” : Can be recomputed using lineage if a node fails.
  • Distributed ๐ŸŒ : Split across a cluster to enable parallelism.
  • In-Memory Computing โšก : Stores intermediate results in memory for faster access.
  • Flexible Creation ๐Ÿ—‚๏ธ : Created from HDFS, S3, or parallelized collections.

     ๐Ÿ”ธ Limitations โŒ :

  • โŒ No Catalyst or Tungsten optimizer
  • โŒ Not memory-efficient (only on-heap memory)
  • โŒ No schema, no native SQL support
  • โŒ High serialization overhead
  • โŒ Affected heavily by Garbage Collection (GC)

๐Ÿ”น 2) DataFrame ๐Ÿ“Š

     ๐Ÿ”ธ Definition :

A DataFrame is a distributed collection of data organized into named columns (similar to a SQL table). It provides high-level APIs and is optimized using the Catalyst Optimizer and Tungsten Engine.

     ๐Ÿ”ธ Highlights โœจ :

  • Table-like structure (similar to SQL, Pandas, R DataFrames)
  • Multi-language support (Scala, Python, Java, R)
  • Scales from GBs to petabytes on clusters
  • Built-in query and memory optimizations

     ๐Ÿ”ธ Limitations โŒ :

  • โŒ Not type-safe (runtime errors)
  • โŒ UDFs reduce optimization benefits
  • โŒ Harder debugging for schema errors

๐Ÿ”น 3) Dataset ๐Ÿงฉ

     ๐Ÿ”ธ Definition :

Dataset is a strongly-typed distributed collection available in Scala/Java. It combines the benefits of RDDs (type safety) and DataFrames (optimizations). Spark uses Encoders for efficient serialization.

     ๐Ÿ”ธ Advantages โœ… :

  • Type-safe with compile-time checks
  • Uses Catalyst & Tungsten optimizations
  • Efficient serialization using Encoders
  • Supports both functional and SQL-style APIs

     ๐Ÿ”ธ Limitations โŒ :

  • โŒ Not available in PySpark
  • โŒ Complex for deeply nested schemas
  • โŒ UDFs act as optimization barriers

๐Ÿ”น 4) Common Features (Across All Three) โœ…

  • โœ… Fault-tolerant
  • โœ… Distributed and parallelized
  • โœ… Immutable (read-only)
  • โœ… Supports lazy evaluation
  • โœ… In-memory processing
  • โœ… Internally processed as RDDs

๐Ÿ”น Key Differences โšก

  • API Style ๐Ÿง  :
           RDDs & Datasets use OOP/functional API, while DataFrames use SQL-style declarative API.
  • How vs What ๐Ÿค– :
           RDDs require specifying how to perform tasks; DataFrames/Datasets declare what to do.
  • Memory Usage ๐Ÿงฎ :
           RDDs use only on-heap memory, while DataFrames/Datasets use both on-heap and off-heap memory.
  • Serialization ๐Ÿ“ฆ :
           RDDs require heavy serialization; DataFrames/Datasets avoid much of it using optimized encoders.
  • Garbage Collection ๐Ÿงน :
           RDDs suffer from GC overhead, while DataFrames and Datasets significantly reduce GC impact.

โœ… Overview ๐Ÿ“‹

Both repartition() and coalesce() are Apache Spark operations used to change the number of partitions in a DataFrame or RDD, but they work differently and are optimized for different scenarios.


๐Ÿ”ท Key Differences ๐Ÿ”

Aspect Repartition Coalesce
Shuffle Always triggers full shuffle ๐Ÿ”€ Avoids shuffle when possible โœ…
Direction Can increase or decrease partitions ๐Ÿ”ผ๐Ÿ”ฝ Only decreases partitions ๐Ÿ”ฝ
Data Distribution Even distribution across partitions โš–๏ธ May result in uneven partitions โš ๏ธ
Performance Slower due to shuffle overhead ๐ŸŒ Faster, minimal data movement ๐Ÿš€
Use Case Need even distribution or more partitions Efficiently reduce partitions
Network I/O High network overhead ๐Ÿ“ก Minimal network overhead ๐Ÿ“‰

๐Ÿ”ท When to Use Each ๐Ÿ’ก

     โœ… Use Repartition When :

  • Increasing partition count for better parallelism โšก
  • Need even data distribution across partitions โš–๏ธ
  • Preparing for downstream operations that benefit from uniform partitions
  • Data is heavily skewed and needs redistribution ๐Ÿ“Š

     Example ๐Ÿ’ป :

df = spark.read.parquet("input.parquet")

repartitioned_df = df.repartition(10)        # 10 partitions with even distribution

repartitioned_by_col = df.repartition("customer_id")   # Partition by column

     โœ… Use Coalesce When :

  • Reducing partition count to avoid small files ๐Ÿ“ฆ
  • Minimizing shuffle overhead is a priority โœ…
  • Final stage before writing to reduce output files ๐Ÿ“
  • Data distribution is already acceptable โš–๏ธ

     Example ๐Ÿ’ป :

# Coalesce example

coalesced_df = df.coalesce(5)    # Reduce to 5 partitions, no shuffle

โœ… What is cache()? ๐Ÿš€

A shortcut to store a DataFrame in memory for faster reuse.

  • Uses the default storage level: MEMORY_AND_DISK ๐Ÿ’พ
  • If data fits in memory, it stays there; otherwise Spark spills to disk ๐Ÿง โžก๏ธ๐Ÿ’ฝ
  • Ideal for frequently accessed data in iterative jobs ๐Ÿ”

โœ… What is persist()? ๐Ÿงฉ

Similar to cache(), but gives you full control over where and how the data is stored.

  • You can choose different storage levels (memory, disk, serialized formats) โš™๏ธ
  • Useful when datasets are large or memory is limited ๐Ÿ“Š

๐Ÿ”น Storage Levels in persist() ๐Ÿ’พ

  • MEMORY_ONLY โšก: Fastest, but recomputes if memory is insufficient
  • MEMORY_AND_DISK โœ…: Safe default, spills to disk when memory is full
  • MEMORY_ONLY_SER ๐ŸงŠ: Stores serialized (compressed) data in memory
  • MEMORY_AND_DISK_SER ๐Ÿ’ฝ: Serialized memory + disk backup
  • DISK_ONLY ๐Ÿข: Stores only on disk (slower but safe)
  • OFF_HEAP ๐Ÿง : Uses external memory (advanced use case)

๐ŸŸข When to Use cache()? โœ…

  • Quick caching without tuning โšก
  • Data fits in memory or disk fallback is acceptable ๐Ÿ’ก
  • DataFrame reused multiple times in a pipeline ๐Ÿ”

๐ŸŸก When to Use persist()? ๐ŸŽฏ

  • You need control over storage strategy โš™๏ธ
  • Large datasets that don't fit fully in memory ๐Ÿ“ฆ
  • Memory-constrained environments to avoid OOM errors โš ๏ธ
  • Performance tuning experiments ๐Ÿ”ฌ

๐Ÿ’ป Code Examples ๐Ÿ‘‡

df = spark.read.csv("data.csv", header=True)

# Cache the DataFrame in memory (with disk fallback)
df.cache()

from pyspark import StorageLevel

df = spark.read.csv("data.csv", header=True)

# Persist using memory-only storage (wonโ€™t use disk)
df.persist(StorageLevel.MEMORY_ONLY)

๐Ÿ”ท Accumulators

โœ… What is an Accumulator?

A write-only shared variable used to perform aggregations or counters across Spark tasks.

Useful for counting events like errors, nulls, or processed records.

Only the driver can read its value; executors can only update it.


๐Ÿšซ Important:

You should not use an accumulator to influence data transformation logic โ€” it's not guaranteed to update exactly once per record (especially with lazy evaluation and task retries).

\# Create an accumulatorerror_count = sc.accumulator(0)

---


๐Ÿ”ท Broadcast Variables

โœ… What is a Broadcast Variable?

A read-only shared variable cached on all executors.

Used to efficiently share large lookup data (e.g., dictionaries or reference tables) across all tasks.

Prevents sending a copy with each task, which saves network traffic and memory.

broadcast_var = sc.broadcast(lookup_dict)

โœ… What is Lineage?

Lineage is the history or recipe of how a DataFrame or RDD was created.

It's a logical chain of transformations applied to data (like map, filter, join, etc.).

Spark tracks lineage to recompute lost data if a partition fails (fault tolerance).


๐Ÿง  Example:

 rdd1 = sc.textFile("file.txt") rdd2 = rdd1.map(lambda x: x.split()) rdd3 = rdd2.filter(lambda x: len(x) > 3) The lineage of rdd3 is: โ†’ file.txt โ†’ map() โ†’ filter() 

๐Ÿ“Œ Use:

Spark uses lineage info to recompute data without needing to persist/copy data. Essential for fault recovery.


---


๐Ÿ”ถ DAG (Directed Acyclic Graph)

โœ… What is a DAG?

A DAG is a graph of stages and tasks that Spark builds to execute your job.

It includes:

  • All stages of transformations.
  • Their execution order.
  • Information about shuffles, dependencies, and task boundaries.

๐Ÿง  Example:

If your job has map โ†’ groupBy โ†’ count, Spark breaks it into stages and tasks with dependencies โ€” this forms a DAG.

Deploy modes decide where the Spark Driver runs and how your application is executed.


โœ… 1) Client Mode ๐Ÿง‘โ€๐Ÿ’ป

  • Driver runs on your local machine (where spark-submit is executed) ๐Ÿ’ป
  • Executors run on the cluster (YARN, Kubernetes, Standalone, etc.) ๐Ÿ–ฅ๏ธ
  • Application logs and output are shown directly in your terminal ๐Ÿ“Ÿ
  • Used for: Development, notebooks, testing, and debugging ๐Ÿงช

โœ… 2) Cluster Mode ๐Ÿ—๏ธ

  • Driver runs inside the cluster (e.g., YARN container or Kubernetes Pod) โ˜๏ธ
  • You can disconnect after submission โ€” Spark job continues to run independently ๐Ÿ”„
  • Logs are stored in cluster management tools (YARN logs / Kubernetes pod logs) ๐Ÿ“ฆ
  • Used for: Production workloads, scheduled jobs, and automation ๐Ÿค–

๐Ÿ”‘ Key Difference

In Client Mode, the driver lives on your machine. In Cluster Mode, the driver lives inside the cluster.

These modes define how and where Spark runs on different cluster managers.


โœ… 1) Standalone Mode ๐Ÿงฑ

     What it is:

  • Sparkโ€™s built-in cluster manager โš™๏ธ
  • Does not require Hadoop or any external system ๐Ÿšซ๐Ÿ˜
  • You manually start a Master and multiple Worker nodes ๐Ÿ–ฅ๏ธ

     When to use โœ…:

  • Small to medium Spark clusters
  • Simple setup without Hadoop or Kubernetes

โœ… 2) YARN Mode ๐Ÿ˜

     What it is:

  • Spark runs on top of Hadoop YARN as the cluster manager ๐Ÿ—๏ธ
  • Very common in enterprise Hadoop environments ๐Ÿข
  • Supports both client mode and cluster mode ๐Ÿ”

     When to use โœ…:

  • When working in a Hadoop ecosystem
  • When Spark needs to share resources with Hive, HDFS, etc.

โœ… 3) Kubernetes Mode โ˜ธ๏ธ

     What it is:

  • Spark runs natively on a Kubernetes cluster โ˜๏ธ
  • Uses Pods for Driver and Executors ๐ŸงŠ
  • Supports autoscaling and containerized execution ๐Ÿ“ฆ

     When to use โœ…:

  • Cloud-native infrastructure (AWS EKS, GCP GKE, AKS)
  • Docker-based deployments with strong resource isolation ๐Ÿณ

๐Ÿ”‘ Quick Summary

Standalone โ†’ Simple, Spark-only cluster
YARN โ†’ Best inside Hadoop ecosystem
Kubernetes โ†’ Best for cloud-native, container-based setups

โœ… Quick Difference

๐Ÿ”น groupByKey: Groups all values for a key first, then aggregates (heavy shuffle).
๐Ÿ”น reduceByKey: Aggregates values during shuffle, reducing data early and improving efficiency.


โœ… 1) reduceByKey ๐Ÿ”

     Syntax:

reduceByKey(func, [numTasks])

     How it works:

  • Performs local aggregation within each partition ๐Ÿงฎ
  • Combines values before shuffle using a combiner โš™๏ธ
  • Only reduced results are shuffled across the network ๐Ÿ“ก
  • Much less data transfer compared to groupByKey ๐Ÿš€

     Best use case โœ…:

When you want sum, min, max, avg or other aggregations.


โœ… 2) groupByKey ๐Ÿงบ

     Syntax:

groupByKey([numTasks])

     How it works:

  • No local reduction happens โŒ
  • All raw values are shuffled across the network ๐Ÿ“ฆ
  • Values are grouped only after shuffle on destination partition
  • High memory usage on worker nodes ๐Ÿ”ฅ
  • Can cause OutOfMemory (OOM) errors โš ๏ธ

     Best use case โœ…:

When you want to retain all values as a list for each key.


โœ… Semantic Difference ๐Ÿง 

  • groupByKey: Returns (key, Iterable[values]) โ†’ full list of values ๐Ÿ“‹
  • reduceByKey: Returns (key, reducedValue) โ†’ single aggregated value ๐ŸŽฏ

โœ… Performance โฑ๏ธ

  • groupByKey: More network traffic + more memory usage โŒ
  • reduceByKey: Less shuffle + better performance โœ…

โœ… Example ๐Ÿงช

# Sample key-value pair RDD
data = [("A", 1), ("B", 2), ("A", 3), ("B", 4), ("C", 5)]

# Using groupByKey
grouped = rdd.groupByKey()
# Result: [("A", [1, 3]), ("B", [2, 4]), ("C", [5])]

# Using reduceByKey
reduced = rdd.reduceByKey(lambda x, y: x + y)
# Result: [("A", 4), ("B", 6), ("C", 5)]

โšก Interview Tip:

Use reduceByKey for aggregation and performance.
Use groupByKey only when you truly need the full list of values.

โœ… What is Partitioning? ๐Ÿงฉ

Partitioning divides a table into separate directories based on column values. Each partition represents a subset of data stored in its own directory structure.


     ๐Ÿ“ Partitioning Structure

/warehouse/sales_table/
โ”œโ”€โ”€ year=2023/month=01/
โ”‚   โ”œโ”€โ”€ file1.parquet
โ”‚   โ””โ”€โ”€ file2.parquet
โ”œโ”€โ”€ year=2023/month=02/
โ”‚   โ”œโ”€โ”€ file1.parquet
โ”‚   โ””โ”€โ”€ file2.parquet
โ””โ”€โ”€ year=2024/month=01/
    โ”œโ”€โ”€ file1.parquet
    โ””โ”€โ”€ file2.parquet

     โœ… Advantages of Partitioning

  • Partition Pruning ๐Ÿš€ โ€” Skips irrelevant partitions
  • Parallel Processing โšก โ€” Independent processing per partition
  • Data Locality ๐Ÿ“ฆ โ€” Related data stays together
  • Easy Data Management ๐Ÿ› ๏ธ โ€” Add or drop partitions easily

     โŒ Disadvantages of Partitioning

  • Small File Problem ๐Ÿงจ โ€” High-cardinality columns create many small files
  • Metadata Overhead ๐Ÿงพ โ€” Too many partitions increase metadata load
  • Uneven Distribution โš–๏ธ โ€” Skewed partition sizes
  • Inefficient Queries ๐Ÿ” โ€” Only effective when filtering on partition columns



โœ… What is Bucketing? ๐Ÿชฃ

Bucketing distributes data into a fixed number of buckets using a hash function applied to specified columns.


     ๐Ÿ—ƒ๏ธ Bucketing Structure

/warehouse/sales_table/
โ”œโ”€โ”€ bucket_00000.parquet
โ”œโ”€โ”€ bucket_00001.parquet
โ”œโ”€โ”€ bucket_00002.parquet
โ”œโ”€โ”€ bucket_00003.parquet
โ””โ”€โ”€ ...

     โœ… Advantages of Bucketing

  • Fixed File Count ๐Ÿ“ โ€” Prevents small file problem
  • Even Distribution โš™๏ธ โ€” Hashing spreads data uniformly
  • Join Optimization ๐Ÿ”— โ€” Enables bucket-map joins
  • Efficient Sampling ๐ŸŽฏ โ€” Read specific buckets
  • Reduced Shuffling ๐Ÿ”„ โ€” Minimizes data movement

     โŒ Disadvantages of Bucketing

  • No Partition Pruning โŒ โ€” Cannot skip buckets
  • Hash Collisions โš ๏ธ โ€” Can cause uneven distribution
  • Bucket Count Choice ๐ŸŽฒ โ€” Requires good planning
  • Fixed Structure ๐Ÿ”’ โ€” Bucket count cannot change easily



โœ… Key Differences ๐Ÿ“Š

Aspect Partitioning Bucketing
Data Organization Directory-based Hash-based buckets
File Count Variable Fixed
Query Pruning Supported โœ… Not Supported โŒ
Data Distribution Can be skewed Generally uniform
Small File Problem Prone โŒ Prevented โœ…
Join Performance Standard joins Optimized bucket joins ๐Ÿš€
Metadata Overhead High with many partitions Low (fixed metadata)

โœ… Bucket Count Guidelines ๐Ÿ“ (Important)

  • Use powers of 2 or multiples of core count ๐Ÿงฎ
  • Target file size of 128MB โ€“ 1GB ๐Ÿ“ฆ
  • Use same bucket count on join tables ๐Ÿ”—
  • Plan for data growth ๐Ÿ“ˆ

โœ… Conclusion ๐Ÿง 

Use Partitioning for time-series and filter-heavy queries.
Use Bucketing for high-cardinality joins and uniform distribution needs.
Use a Hybrid approach when you need both pruning and join optimization.

โœ… What is a Shuffle Hash Join?

A Shuffle Hash Join is a join strategy used in Spark when two large tables are joined and neither can fit entirely in memory (unlike Broadcast Join).

Spark shuffles both datasets across the cluster by the join key, then performs a hash-based join inside each partition.


๐Ÿง  How It Works (Step-by-Step)

  • Shuffle ๐Ÿ” : Both DataFrames are redistributed so that rows with the same join key land in the same partition.
  • Build Phase ๐Ÿงฑ : Spark builds an in-memory hash table using the smaller dataset (if known).
  • Probe Phase ๐Ÿ”Ž : Spark scans the larger dataset and looks up matches inside the hash table.
  • Output โœ… : Matching rows are combined and returned.

๐Ÿ” When is it Used?

  • When Broadcast Join is not possible ๐Ÿ“ก
  • When both tables are too large for memory ๐Ÿ’พ
  • When Spark cannot determine a clearly smaller table ๐Ÿคทโ€โ™‚๏ธ

๐Ÿ“Š Shuffle Hash Join Process

     Step 1: Shuffle Phase ๐Ÿ”€

Node 1 receives:
  Orders:    (101, order_data)
  Customers: (101, customer_data)

Node 2 receives:
  Orders:    (102, order_data)
  Customers: (102, customer_data)

Node 3 receives:
  Orders:    (103, order_data)
  Customers: (103, customer_data)

     Step 2: Hash Join Phase ๐Ÿงฎ

Node 1:
  Hash Table:
    {101: ("Alice", "NYC")}

  Probe:
    Orders: (1, 101, 250) โ†’ Match found โœ…

  Result:
    (1, 101, 250, "Alice", "NYC")

โš ๏ธ Trade-offs

Pros โœ… Cons โŒ
Works well for large tables Expensive shuffle operation
Scales with cluster size Requires high memory for hash map
Supports high parallelism Performance suffers with skewed data

๐Ÿ“ Real-World Notes

  • Spark uses ShuffledHashJoin when one side is smaller, but not small enough to broadcast ๐Ÿ“ฆ
  • Both datasets are shuffled, which makes this join expensive โš ๏ธ

โœ… How it works โš™๏ธ

  • Smaller table is broadcast to all executors ๐Ÿ“ฆ
  • No shuffle is required for the larger table ๐Ÿšซ๐Ÿ”€
  • Each executor performs the join locally ๐Ÿง 

โœ… When is it used? โฑ๏ธ

  • One table is smaller than spark.sql.autoBroadcastJoinThreshold (default ~10MB) ๐Ÿ“
  • Spark automatically selects it for small dimension tables ๐Ÿค–

๐Ÿงช Example

-- Users: 5MB, Orders: 10GB
SELECT * 
FROM orders o 
JOIN users u 
  ON o.user_id = u.id;

๐Ÿ” Process Flow

  • 1๏ธโƒฃ Broadcast the Users table to all executors ๐Ÿ“ค
  • 2๏ธโƒฃ Each executor builds a local hash table ๐Ÿงฑ
  • 3๏ธโƒฃ Orders table stays partitioned (no shuffle) ๐Ÿ“ฆ
  • 4๏ธโƒฃ Local join executed inside each executor โœ…

โœ… Pros โญ

  • No shuffle required ๐Ÿš€
  • Very fast for small dimension tables โšก
  • Perfect for star schema joins ๐ŸŒŸ

โŒ Cons โš ๏ธ

  • Limited by broadcast size threshold ๐Ÿ“
  • Consumes driver and executor memory ๐Ÿง 

โœ… How it works โš™๏ธ

  • Both tables are shuffled by the join key ๐Ÿ”€
  • Each partition is sorted by the join key ๐Ÿ”ค
  • A merge-sort algorithm is used to join sorted partitions ๐Ÿ”—
  • This is the default join strategy when others donโ€™t apply ๐Ÿค–

โœ… When is it used? โฑ๏ธ

  • Both tables are large ๐Ÿ“Š
  • Equi-joins with sortable keys ๐Ÿ”‘
  • When broadcast and hash joins are not suitable ๐Ÿšซ

๐Ÿงช Example

-- Table A: 5GB, Table B: 8GB
SELECT * 
FROM large_table_a a 
JOIN large_table_b b 
  ON a.category = b.category;

๐Ÿ” Process Flow

  • 1๏ธโƒฃ Shuffle both tables by join key ๐Ÿ”€
  • 2๏ธโƒฃ Sort partitions by join key ๐Ÿ”ค
  • 3๏ธโƒฃ Merge sorted partitions using two-pointer technique ๐Ÿงญ
  • 4๏ธโƒฃ Stream through both datasets simultaneously ๐ŸŒŠ

โœ… Pros โญ

  • Works with very large tables โœ…
  • Memory efficient due to streaming nature ๐Ÿ’พ
  • Handles duplicate keys well ๐Ÿ”

โŒ Cons โš ๏ธ

  • Requires shuffle for both tables ๐Ÿ”€
  • Sorting overhead adds latency ๐Ÿข
  • Slower than hash-based joins โณ

โœ… Join Selection Logic ๐Ÿง 

Is smaller table < 10MB?
โ”œโ”€ โœ… YES โ†’ ๐Ÿ“ข Broadcast Join
โ””โ”€ โŒ NO โ†’ Is smaller table < 64MB and fits in memory?
    โ”œโ”€ โœ… YES โ†’ ๐Ÿ”€ Shuffle Hash Join
    โ””โ”€ โŒ NO โ†’ ๐Ÿ” Sort Merge Join

๐Ÿ“Š Performance Comparison Table

Join Type Shuffle Required Memory Usage Best For
๐Ÿ“ข Broadcast Join One side only High (broadcast) Small dimension tables
๐Ÿ”€ Shuffle Hash Join Both sides Medium (hash table) Medium tables with size difference
๐Ÿ” Sort Merge Join Both sides Low (streaming) Large tables of similar size

๐ŸŽฏ Key Interview Points

  • ๐Ÿ“ข Broadcast Join is the fastest but limited by table size.
  • ๐Ÿ”€ Shuffle Hash Join is the middle-ground and requires enough executor memory.
  • ๐Ÿ” Sort Merge Join works in most cases but has sorting overhead.
  • ๐Ÿค– Spark automatically selects join strategy based on table statistics.
  • ๐Ÿงฉ You can force a strategy using hints: /*+ BROADCAST(table) */

Dynamic Resource Allocation (DRA) allows Spark to automatically adjust the number of executors at runtime based on workload.


โœ… Why Use Dynamic Resource Allocation?

  • ๐Ÿ“ˆ Scales up executors when workload increases
  • ๐Ÿ“‰ Scales down executors when they are idle
  • ๐Ÿ’ฐ Optimizes overall cluster resource usage

๐Ÿง  How It Works

  • Monitors number of pending tasks
  • Tracks executor idle time
  • Adds or removes executors automatically
  • Works with YARN, Kubernetes, or Standalone cluster manager

โš™๏ธ Key Configurations

spark.dynamicAllocation.enabled              = true
spark.dynamicAllocation.minExecutors         = 2
spark.dynamicAllocation.initialExecutors     = 4
spark.dynamicAllocation.maxExecutors         = 10
spark.dynamicAllocation.executorIdleTimeout  = 60s

๐Ÿงช Example spark-submit

spark-submit \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.minExecutors=2 \
  --conf spark.dynamicAllocation.maxExecutors=8 \
  --conf spark.dynamicAllocation.executorIdleTimeout=60 \
  your_script.py

๐Ÿ›  Required Setting

spark.shuffle.service.enabled=true

โœ… Benefits

  • ๐Ÿ’ธ Frees unused resources automatically
  • โšก Adapts to workload changes in real time
  • ๐Ÿค– Removes need for manual executor tuning

โš ๏ธ Why Static Allocation Is Bad

  • Wastes resources when executors stay idle
  • Can cause slow jobs when executors are insufficient
  • Cannot adapt to workload spikes

๐ŸŽฏ Interview Key Points

  • Spark scales executors based on pending tasks
  • Requires external shuffle service to work correctly
  • Most useful for long-running and streaming jobs

โšก Adaptive Query Execution (AQE) in Spark


Adaptive Query Execution (AQE) is a dynamic optimization feature in Spark SQL (Spark 3.x+) that changes the execution plan at runtime based on real data statistics.


๐ŸŽฏ Why AQE Is Needed

  • Traditional Spark plans are built at compile time using limited statistics
  • Real-world data contains skew, unknown sizes, and uneven distributions
  • AQE adapts the plan during execution based on actual runtime metrics

โš™๏ธ Key Features of AQE

  • ๐Ÿ”„ Dynamic Join Strategy Selection (can switch Sort-Merge โ†’ Broadcast Join)
  • โš–๏ธ Skew Join Handling by splitting skewed partitions
  • ๐Ÿ“ฆ Dynamic Shuffle Partition Coalescing to reduce small tasks
  • โ™ป๏ธ Local Shuffle Reader for efficient shuffle reuse

๐ŸŒ Real-World Use Case

In a production workload joining ship activity with weather data, heavy port skew caused some tasks to run 10x slower. After enabling AQE and skew join handling, execution time improved by over 40% and stabilized job performance.


๐Ÿ›  How to Enable AQE

spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.localShuffleReader.enabled=true

๐Ÿ“Œ Configuration Explanation

spark.sql.adaptive.enabled                    โ†’ Enables AQE
spark.sql.adaptive.skewJoin.enabled           โ†’ Handles skewed joins
spark.sql.adaptive.coalescePartitions.enabled โ†’ Merges small shuffle partitions
spark.sql.adaptive.localShuffleReader.enabled โ†’ Improves shuffle reuse

โš ๏ธ When AQE May Not Help

  • Cluster is CPU or disk I/O bottlenecked
  • Data is already perfectly partitioned
  • Manual hints may override AQE decisions

โœ… Best Practices

  • Enable AQE by default for batch workloads
  • Monitor decisions in Spark UI under "Adaptive Spark Plan"
  • Combine with partitioning and Delta Lake Z-Ordering

๐Ÿ Summary

AQE helps Spark dynamically optimize execution at runtime, improving performance, cost efficiency, and reliability without heavy manual tuning.

Off-heap memory in Apache Spark refers to memory allocated outside the Java Virtual Machine (JVM) heap. This memory is manually managed by Spark and is not controlled by the Garbage Collector (GC), helping reduce GC overhead and improve performance.


๐Ÿง  1. What is Off-Heap Memory?

  • Allocated outside the JVM heap using native methods
  • Not managed by the Java Garbage Collector
  • Explicitly controlled and released by Spark
  • Ideal for caching, shuffle buffers, and intermediate state management

โœ… 2. Why Use Off-Heap Memory?

  • ๐Ÿ—‘๏ธ Reduced GC overhead for large datasets
  • โšก Faster binary serialization and direct memory access
  • ๐Ÿ“ฆ Compact memory layouts with fixed-length storage
  • ๐Ÿš€ Better memory utilization with low object overhead
  • ๐Ÿ“ฅ Handles large memory buffers efficiently

๐Ÿ› ๏ธ 3. How to Enable Off-Heap Memory

spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=4g

You can configure this via:

  • spark-submit --conf
  • spark-defaults.conf
  • Notebook / Databricks session configs

โš™๏ธ 4. Components That Use Off-Heap Memory

4.1 Tungsten Execution Engine ๐Ÿ”ฅ

  • Uses off-heap for rows, sorts, and aggregations
  • Uses UnsafeRow and UnsafeMemory
  • Reduces object creation and GC load

4.2 Spark SQL Caching ๐Ÿ’พ

  • Serialized caching can leverage off-heap storage

4.3 Broadcast Variables ๐Ÿ“ก

  • Large broadcast tables can live off-heap

4.4 Shuffle & Sort Buffers ๐Ÿ”€

  • Uses off-heap during shuffle, spill, and external sorting

โš–๏ธ 5. Heap vs Off-Heap Memory

Heap Memory       โ†’ JVM Garbage Collected, Object-Based, Higher GC cost
Off-Heap Memory   โ†’ Manually Managed by Spark, Binary Format, Low GC cost

โš ๏ธ 6. Limitations & Considerations

  • Manual memory release risks native memory leaks
  • Harder to debug than heap-based memory
  • Spark UI does not directly show off-heap usage
  • Off-heap size must be pre-allocated

โœ… 7. Best Practices

  • Tune off-heap size based on executor memory layout
  • Enable compressed in-memory columnar storage
  • Use OS-level tools like top, htop, jcmd for monitoring
  • Combine with dynamic allocation and proper spill configs

๐Ÿ 8. Summary

Off-heap memory helps Spark avoid GC pauses, improve throughput, and handle large datasets efficiently. It is best suited for shuffle-heavy workloads, caching use cases, and large-scale Spark SQL processing when sized and monitored correctly.

๐Ÿ”น 1. What is SparkContext?

SparkContext is the primary entry point to Sparkโ€™s core functionality in Apache Spark 1.x. It allows applications to connect to a cluster and perform distributed processing using RDDs (Resilient Distributed Datasets).

     Key Responsibilities:

  • Cluster Connection ๐ŸŒ : Initializes connection to YARN, Mesos, or Standalone cluster manager
  • Job Scheduling ๐Ÿ—“๏ธ : Manages job scheduling and task distribution across executors
  • RDD Creation ๐Ÿ“ฆ : Creates RDDs from files or in-memory collections
  • Configuration Access โš™๏ธ : Reads Spark configuration from SparkConf
  • Hadoop Integration ๐Ÿ˜ : Works with HDFS and Hadoop-compatible file systems

val conf = new SparkConf().setAppName("MyApp").setMaster("local[*]")
val sc = new SparkContext(conf)

     Limitations:

  • โŒ No built-in SQL, Hive, or DataFrame support
  • โŒ Requires separate SQLContext or HiveContext

๐Ÿ”น 2. What is SparkSession?

SparkSession, introduced in Spark 2.0, is the unified entry point for working with Spark SQL, RDDs, DataFrames, and Datasets. It merged all older contexts into a single, simplified API.

     Key Benefits:

  • Unified API Access ๐Ÿงฉ : Supports SQL, DataFrame, Dataset, and RDD APIs
  • Simplified Initialization ๐Ÿงฑ : Uses builder pattern for cleaner configuration
  • SQL & Hive Support ๐Ÿงฎ : Directly runs SQL and Hive queries
  • Catalog Access ๐Ÿ“š : Query metadata, tables, and databases
  • Internal SparkContext ๐Ÿ”— : Accessible via spark.sparkContext

val spark = SparkSession.builder()
  .appName("MyApp")
  .master("local[*]")
  .enableHiveSupport()
  .getOrCreate()

๐Ÿ”น 3. Why was SparkSession Introduced?

     Reasons:

  • Unified API ๐Ÿง  : Combines SparkContext, SQLContext, and HiveContext
  • Ease of Use โœจ : Reduces setup complexity
  • Better Integration ๐Ÿ”— : Built-in Spark SQL and Hive metastore support
  • Future-Proof ๐Ÿš€ : Aligns with Sparkโ€™s structured processing roadmap
  • Cleaner Syntax ๐Ÿงผ : Fluent, readable configuration style

๐Ÿ“ Quick Summary

  • SparkContext ๐Ÿ—๏ธ : Low-level engine mainly for RDD-based processing
  • SparkSession ๐Ÿงฉ : Modern unified entry point for all Spark APIs

๐Ÿ“˜ Data Skewness in Apache Spark


โ“ What is Data Skew?

Data skew occurs when data is unevenly distributed across partitions โ€” meaning some partitions hold a much larger volume of data than others. This creates slow-running tasks and poor cluster efficiency.


โš ๏ธ Why It Matters

In Spark, wide operations like groupByKey, join, and reduceByKey involve shuffles. When certain keys appear too frequently, one partition becomes overloaded, while other executors stay idle โ€” leading to straggler tasks and stage delays.


๐Ÿ› ๏ธ How to Fix Data Skew

Strategy Description
Salting ๐Ÿง‚ Append random suffixes to skewed keys to distribute data across partitions
Broadcast Join ๐Ÿ“ก Broadcast small tables to avoid expensive shuffles
Custom Partitioning โš™๏ธ Implement custom partitioners for better load balancing
Skew Join Handling (Spark 3+) ๐Ÿค– Enable spark.sql.adaptive.skewJoin.enabled = true to auto-handle skew

โœ… Best Practices

  • Prefer reduceByKey or aggregateByKey over groupByKey โšก
  • Monitor Spark UI for long-running stages and straggler tasks ๐Ÿ“Š
  • Enable AQE (Adaptive Query Execution) in Spark 3.x for automatic optimizations ๐Ÿš€

โ“ What is Salting?

Salting is a technique used to handle data skew in Apache Spark by adding randomness to highly frequent keys. This spreads records more evenly across partitions during join, groupBy, and reduceByKey operations.


๐Ÿ”ง Salting for Aggregation (groupBy / reduceByKey)

โœ… Goal

Break โ€œhot keysโ€ into multiple pseudo-keys using random salts so workload is distributed across executors.


๐Ÿ”„ Steps

  • Add a random salt column ๐ŸŽฒ
  • Create a salted key by combining original key + salt ๐Ÿ”—
  • Aggregate using salted keys ๐Ÿ“Š
  • Remove salt and re-aggregate to get final results โœ…

โœ… Code Example

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, concat, split

spark = SparkSession.builder.appName("SaltingExample").getOrCreate()

data = [("A", 100)] * 1000000 + [("B", 200)] * 1000
df = spark.createDataFrame(data, ["product", "amount"])

df_salted = df.withColumn("salt", (rand() * 10).cast("int"))

df_salted = df_salted.withColumn("product_salted", concat(col("product"), col("salt")))

intermediate = df_salted.groupBy("product_salted").sum("amount")

result = intermediate.withColumn("product", split(col("product_salted"), "_")[0]) \
                     .groupBy("product").sum("sum(amount)")

result.show()

๐Ÿ” Salting for Join Operations

โœ… Goal

Replicate the small table with multiple salts so it matches salted keys of the large skewed table.


โœ… Join Salting Code Example

from pyspark.sql.functions import explode, array, lit

large = df.withColumn("salt", (rand() * 5).cast("int"))
large = large.withColumn("product_salted", concat(col("product"), col("salt")))

small_data = [("A", "Type1"), ("B", "Type2")]
small = spark.createDataFrame(small_data, ["product", "type"])

salt_range = array([lit(i) for i in range(5)])
small_expanded = small.withColumn("salt", explode(salt_range))
small_expanded = small_expanded.withColumn("product_salted", concat(col("product"), col("salt")))

joined = large.join(small_expanded, on="product_salted", how="inner")

๐Ÿšซ When Not to Use Salting

  • If data skew is mild or temporary โš ๏ธ
  • If AQE with spark.sql.adaptive.skewJoin.enabled = true is enabled ๐Ÿค–
  • If a Broadcast Join is possible ๐Ÿ“ก

๐Ÿ“˜ Spark Small File Problem


๐Ÿ” What Are Small Files?

  • Files smaller than the HDFS block size (typically 128MB)
  • Spark processes each file as a separate task, so many small files โ†’ many tasks โ†’ high overhead

๐Ÿ“ What Counts as โ€œSmallโ€?

File Size Impact
< 1MB Very severe performance issues โŒ
1MB โ€“ 32MB Noticeable performance hit โš ๏ธ
32MB โ€“ 128MB Acceptable โœ…
128MB โ€“ 1GB Optimal for Spark ๐Ÿš€

โš ๏ธ Why Are Small Files a Problem?

  • Task Overhead โฑ๏ธ
    One task per file โ†’ more scheduling time โ†’ slower jobs
  • Metadata Overhead ๐Ÿง 
    HDFS/Spark must track each file โ†’ higher memory usage
  • Resource Underutilization โš™๏ธ
    Tiny tasks finish fast and waste CPU/memory

๐Ÿ’ฅ Common Causes

  • Frequent writes with small DataFrames
  • High-cardinality partition columns (e.g., user_id)
  • Micro-batch streaming jobs

๐Ÿงช Example Problem

# Bad practice โ€“ creates small files
df = spark.read.json("tiny_input.json")   # ~5MB
df.write.parquet("output/")

โœ… Solutions to the Small File Problem

1๏ธโƒฃ Coalesce or Repartition

# Combine small partitions into fewer large ones
df.coalesce(1).write.parquet("output/")

# Use repartition() when you want even distribution (causes shuffle)

2๏ธโƒฃ Set Target File Size

# Target ~128MB file size
spark.conf.set("spark.sql.files.maxPartitionBytes", 134217728)

3๏ธโƒฃ Use Efficient File Formats

df.write.format("parquet").save("output/")
  • Avoid JSON/CSV for analytics workloads โŒ
  • Prefer Parquet/ORC for performance โœ…

4๏ธโƒฃ Enable Compression

spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

5๏ธโƒฃ Smart Partitioning

  • Avoid high-cardinality columns like user_id, transaction_id โŒ
  • Use low-cardinality columns like year, month, region โœ…

๐Ÿ Best Practices Summary

  • ๐ŸŽฏ Target file size: 128MB โ€“ 1GB
  • โœ… Minimum acceptable: 32MB
  • ๐Ÿšซ Avoid very small files (<1MB)
  • ๐Ÿ“‚ Use coalesce(), Parquet, compression, and smart partitioning

Apache Spark is a powerful distributed data processing engine. However, poorly written Spark jobs can lead to inefficient resource utilization and long runtimes. This guide outlines detailed Spark optimization techniques categorized by areas such as memory, shuffles, joins, caching, code patterns, configurations, and storage layout.


1. Data Serialization & Memory Optimization

1.1 Use Efficient Serialization

  • Kryo Serialization is faster and more compact than Java serialization.
  • Set Kryo in configuration:
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    spark.kryo.registrationRequired=true

1.2 Avoid Creating Large Objects in Driver

  • Donโ€™t use collect() or toPandas() on large datasets.
  • Avoid building large maps/lists on the driver.

1.3 Use Broadcast Variables Wisely

  • For small lookup tables, broadcast them to avoid large shuffles.
    broadcastVar = sparkContext.broadcast(lookup_dict)

2. Shuffle Optimization

2.1 Prefer reduceByKey over groupByKey

  • reduceByKey aggregates during the shuffle stage; groupByKey shuffles all data.

2.2 Tune Shuffle Partitions

  • Set an optimal number of partitions:
    spark.sql.shuffle.partitions = 200  # default is often too high

2.3 Use coalesce() to Reduce Partitions Before Output

  • Avoid writing many small files:
    df.coalesce(1).write.parquet(...)

3. Join Optimization

3.1 Use Broadcast Join for Small Tables

  • Broadcast small dimension tables in star schema joins:
    SELECT /*+ BROADCAST(smallTable) */ ...

3.2 Enable Adaptive Query Execution (AQE)

  • Automatically optimizes join strategies and skew handling.
    spark.sql.adaptive.enabled=true
    spark.sql.adaptive.skewJoin.enabled=true

3.3 Avoid Skewed Joins

  • Identify and handle skewed keys using salting or AQE.

4. Caching & Persistence

4.1 Cache Only When Necessary

  • Use .persist(StorageLevel.MEMORY_AND_DISK) if data doesn't fit in memory.
  • Always unpersist when caching is no longer needed.

4.2 Avoid Caching After Narrow Transformations

  • Donโ€™t cache intermediate RDDs unless reused multiple times.

5. Code-Level Optimizations

5.1 Avoid UDFs When Possible

  • Use built-in Spark SQL functions; they are optimized and run natively.

5.2 Use Partition Pruning

  • Filter early using partition columns to avoid unnecessary reads.
    WHERE year = 2024 AND month = 6

5.3 Leverage Columnar Format

  • Store data in Parquet or ORC for better I/O performance.

6. Configuration Tuning

6.1 Memory Settings

    --executor-memory 4G
    --executor-cores 4
    --num-executors 10

6.2 Enable Dynamic Allocation

    spark.dynamicAllocation.enabled=true
    spark.shuffle.service.enabled=true

6.3 GC and JVM Tuning

  • Monitor GC behavior via Spark UI.
  • Use G1GC or CMS for better memory handling.

7. File and Storage Optimizations

7.1 Avoid Small Files Problem

  • Merge files using coalesce() or repartition() before writing.
  • Use automatic optimization in Delta Lake.

7.2 Use Delta Lake (in Databricks)

  • Delta allows efficient updates, merges, and provides ACID compliance.

7.3 Optimize Table Layout

    OPTIMIZE table_name ZORDER BY (column)

7.4 Enable Auto Optimize and Auto Compaction (Databricks)

ALTER TABLE my_table SET TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true'
);

optimizeWrite ensures efficient file sizes at write time.

autoCompact runs background jobs to merge small files.

7.5 Manual OPTIMIZE and VACUUM

    OPTIMIZE my_table;
    VACUUM my_table RETAIN 168 HOURS;

8. Monitoring and Debugging

8.1 Use Spark UI

  • Analyze DAGs, stages, and tasks to identify bottlenecks.

8.2 Use Logging and Metrics

  • Log job progress.
  • Integrate with tools like Ganglia, Prometheus, or Datadog.

8.3 Profile Jobs Using spark.eventLog.enabled

    spark.eventLog.enabled=true
    spark.eventLog.dir=/path/to/logs

โ“ What is Serialization?

Serialization in Spark is the process of converting objects into a byte-stream format so they can be:
โ€ข Transferred across the network
โ€ข Stored in memory or disk
โ€ข Reconstructed later when needed

Since Spark is distributed, both data and code (closures) must move from the Driver to Executors.


๐Ÿ“ Where Spark Uses Serialization

Component Purpose
RDD / DataFrame Shuffle Send intermediate data across executors
Broadcast Variables Distribute large read-only variables efficiently
Accumulators Track global metrics
Caching Store intermediate results in serialized form
Closures Send function logic from Driver to Executors

๐Ÿš€ Built-in Spark Serializers

Serializer Description
Java Serializer Default. Slower and creates large objects.
Kryo Serializer Faster and more compact (requires class registration).

โš™๏ธ Enable Kryo (Recommended)

spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired = true

๐Ÿ”ฅ Why Serialization Matters for Performance

Concern Explanation
Speed โšก Java serialization is slow. Kryo can be 10xโ€“100x faster.
Size ๐Ÿ“ฆ Serialized objects consume memory. Kryo produces smaller payloads.
GC Pressure ๐Ÿงน Large serialized objects increase garbage collection overhead.
Shuffle & Disk IO ๐Ÿ’พ Compact serialization speeds up shuffle and reduces disk spill.

โœ… Interview Tip

Always mention that Kryo is preferred in production due to better performance and lower memory usage.

๐Ÿšจ Apache Spark Out of Memory (OOM) Issues: Causes and Solutions

๐Ÿ“Œ What is an OOM Error in Spark?

OOM (Out Of Memory) errors in Apache Spark happen when a task or executor exceeds the memory limit allocated to it, leading to failure. These issues can severely impact job performance or cause job failure.

๐Ÿ” Root Causes of OOM in Spark

1๏ธโƒฃ Large Data Volume
    Processing more data than available memory.
    Joining large datasets without adequate partitioning.

2๏ธโƒฃ Skewed Data
    Uneven distribution of data causes some partitions to be much larger than others.
    Leads to few tasks requiring more memory than allocated.

3๏ธโƒฃ Improper Partitioning
    Too few partitions โ†’ large partitions โ†’ memory pressure.
    Too many partitions โ†’ excessive task scheduling overhead.

4๏ธโƒฃ Large Shuffles
    Wide transformations (e.g., groupByKey, join, reduceByKey) can cause massive shuffles requiring memory for sorting, buffering, and storing shuffle files.

5๏ธโƒฃ Improper Caching
    Caching large datasets in memory without persist(StorageLevel.MEMORY_AND_DISK) fallback.
    Repeated caching without unpersisting old data.

6๏ธโƒฃ Memory Configuration Mismanagement
    Low executor/driver memory configuration.
    Spark's memory management (storage vs execution memory) poorly tuned.

7๏ธโƒฃ Improper Use of Collect or Broadcast
    collect() brings all data to the driver โ†’ OOM on driver.
    Broadcasting very large variables โ†’ OOM on executors.

๐Ÿ› ๏ธ Solutions and Best Practices

โœ… General Memory Management

    spark.executor.memory โ†’ Amount of memory allocated to each executor
    spark.driver.memory โ†’ Memory allocated to the driver
    spark.memory.fraction โ†’ Fraction of memory for execution and storage (default 0.6)
    spark.memory.storageFraction โ†’ Fraction of memory reserved for storage

Use Spark UI to monitor memory usage.

Increase executor and driver memory based on workload:

--executor-memory 8g
--driver-memory 4g
  
โœ… Data Skew Mitigation

    Salting technique: Add random prefix to keys before shuffle operations.
    Repartitioning: Use repartition() or coalesce() to balance data distribution.
    Avoid groupByKey; prefer reduceByKey or aggregateByKey.

โœ… Optimize Shuffles

Tune these shuffle configurations:

spark.shuffle.compress=true
spark.shuffle.spill.compress=true
spark.shuffle.file.buffer=64k
spark.reducer.maxSizeInFlight=48m
  
โœ… Cache Carefully

    Use .persist(StorageLevel.MEMORY_AND_DISK) instead of .cache() when unsure if data fits in memory.
    Explicitly call .unpersist() when cached data is no longer needed.

โœ… Partitioning Strategy

    Aim for ~100โ€“200 MB per partition for optimal performance.
    Tune number of partitions:

spark.sql.shuffle.partitions=200
  
โœ… Avoid Large Collect/Broadcast

    Avoid using .collect() unless data is small enough to fit in driver memory.
    Prefer take(n) for sampling.
    Avoid broadcasting large datasets.

๐Ÿง  1. Executor Memory OOM

Scenario: Spark throws java.lang.OutOfMemoryError: Java heap space
Where it happens: Inside the executors when tasks run out of heap memory.

Example:
val data = sc.textFile("large_file.csv")
val grouped = data.map(_.split(",")(0)).groupByKey().collect()
  
Fixes:
    Avoid groupByKey, use reduceByKey or aggregateByKey.
    Increase executor memory: --executor-memory 8G
    Repartition with smaller partitions: df.repartition(200)
    Enable spill to disk (default is enabled for shuffles).

๐Ÿง  2. Driver Memory OOM

Example:
val bigDF = spark.read.parquet("huge_table")
val localList = bigDF.collect()
  
Fixes:
    Avoid .collect(), prefer .show() or .take(10).
    Increase driver memory: --driver-memory 4G
    Use .foreach() instead of pulling data to driver.

๐Ÿง  3. Shuffle Memory OOM

Example:
val df = bigDF1.join(bigDF2, "key")
  
Fixes:
    Enable adaptive query execution (AQE): spark.sql.adaptive.enabled=true
    Use skew join handling: spark.sql.adaptive.skewJoin.enabled=true
    Increase spark.shuffle.memoryFraction
    Use salting or bucketing for skewed keys.

๐Ÿง  4. Broadcast Join OOM

Example:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)
val result = bigDF.join(smallDF, Seq("id"))
  
Fixes:
    Reduce spark.sql.autoBroadcastJoinThreshold (e.g. 10MB).
    Use hint("MERGE") to force shuffle join.
    Partition both tables before join to avoid broadcasting.

๐Ÿง  5. Caching OOM

Example:
val df = spark.read.parquet("huge_file")
df.cache()
df.count()
  
Fixes:
    Use persist(StorageLevel.MEMORY_AND_DISK)
    Unpersist unused cached data: df.unpersist()
    Compress in-memory storage: spark.rdd.compress=true

๐Ÿง  6. Too Many Partitions (Task OOM)

Example:
val df = spark.read.json("s3://bucket/small_files/*")
df.repartition(10000)
  
Fixes:
    Use coalesce() or sensible repartition() count.
    Use file compaction (autocompaction or OPTIMIZE in Delta).
    Avoid explode on huge arrays without filtering.

๐Ÿ”น Spark Unified Memory

Spark Unified Memory


๐Ÿ‘‰ Short understanding:

Spark Unified Memory is the pool used together by Storage & Execution memory. Earlier Spark had separate pools, but now they dynamically borrow from each other. This allows Spark to utilize memory efficiently and avoid OOM issues.


๐Ÿ“Š Spark Memory Architecture Diagram


[ Executor ]
     |
     โ”œโ”€โ”€Reserved (300MB)  โ† fixed
     |
     โ””โ”€โ”€Spark Memory
            |
            โ”œโ”€โ”€Storage (50%)
            |
            โ””โ”€โ”€Execution (50%)


 โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
 โ”‚           Executor            โ”‚
 โ”‚                               โ”‚
 โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
 โ”‚  โ”‚      JVM Heap Memory    โ”‚  โ”‚
 โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
 โ”‚                               โ”‚
 โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
 โ”‚  โ”‚     Overhead Memory     โ”‚  โ”‚
 โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
 โ”‚                               โ”‚
 โ”‚ spark.executor.memory = 8GB   โ”‚
 โ”‚ spark.executor.memoryOverhead=800MB
 โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                  โ”‚
                  โ”‚  (~300MB reserved)
                  โ–ผ

        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚     Reserved Memory    โ”‚
        โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
        โ”‚      Spark Memory      โ”‚  <--- spark.sql.fraction = 0.6
        โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
        โ”‚      User Memory       โ”‚
        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                  โ”‚
                  โ”‚    (dynamic)
                  โ–ผ

        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
        โ”‚     Reserved Memory    โ”‚
        โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
        โ”‚  Storage Memory Pool   โ”‚  <--- spark.sql.storageFraction=0.5
        โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
        โ”‚  Execution Memory Pool โ”‚
        โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
        โ”‚       User Memory      โ”‚
        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Size: (Executor Memory - Reserved Memory) ร— spark.sql.fraction

Default: ~60% of available memory (spark.sql.fraction = 0.8)

Components:


๐ŸŸฆ Storage Memory

3a. Storage Memory

๐Ÿ‘‰ Short understanding:

This memory is mainly used for caching datasets or keeping broadcast variables. It grows whenever there is free execution memory, and shrinks when execution needs space. If memory is needed urgently, storage blocks get evicted first.

Initial Size: Spark Memory ร— spark.sql.storageFraction

Default: 50% of Spark Memory (spark.sql.storageFraction = 0.5)

Purpose:

Caching RDDs with .cache() or .persist()

Caching DataFrames and Datasets

Storing broadcast variables

Intermediate results from .checkpoint()

Eviction: Can be evicted when Execution Memory needs space

Spill Behavior: Cached data is either recomputed or read from disk based on storage level


๐ŸŸฅ Execution Memory

3b. Execution Memory

๐Ÿ‘‰ Short understanding:

This is the โ€œwork memoryโ€ Spark uses during transformations like joins, aggregates and shuffles. Execution memory has higher priority, so it cannot be evicted. When full, Spark spills data to disk instead of crashing with OOM.

Initial Size: Spark Memory ร— (1 - spark.sql.storageFraction)

Default: 50% of Spark Memory

Purpose:

Shuffles: Hash tables and sort buffers during data shuffling

Joins: Hash tables for broadcast joins and sort-merge joins

Aggregations: Hash maps for groupBy operations and window functions

Sorting: Temporary space for orderBy operations

UDF Execution: Memory for complex user-defined functions

Eviction: Cannot be evicted (has higher priority than Storage)

Spill Behavior: When full, data spills to disk (controlled spilling)

(Controlled spilling is Spark's intelligent mechanism to handle situations when Execution Memory is full. Instead of crashing with an OutOfMemoryError, Spark gracefully moves some data from memory to disk, continues the operation, and reads the data back when needed.)


โญ Practical Example: 1GB Executor Memory Breakdown

๐Ÿ‘‰ Short understanding:

This example helps visualize real numbers. Reserved memory is fixed, and the rest gets partitioned between User vs Spark Memory. Then spark memory is again split into Storage vs Execution based on fractions.

Let's walk through a complete example with 1GB executor memory:


    Initial Memory Allocation

    Total Executor Memory: 1GB (1024MB)

    Step 1: Subtract Reserved Memory

โ”œโ”€โ”€ Reserved Memory: 300MB (fixed)

โ””โ”€โ”€ Usable Memory: 724MB (1024MB - 300MB)

    Step 2: Split Usable Memory

โ”œโ”€โ”€ User Memory: 145MB (724MB ร— 0.2)

โ””โ”€โ”€ Spark Memory: 579MB (724MB ร— 0.8)

    Step 3: Split Spark Memory

โ”œโ”€โ”€ Storage Memory: 290MB (579MB ร— 0.5) - initially

โ””โ”€โ”€ Execution Memory: 289MB (579MB ร— 0.5) - initially


โญ Important Dynamic Memory Allocation

๐Ÿ‘‰ Short understanding:

Memory is not rigidly partitioned; Spark keeps it flexible. Execution memory has higher priority and may evict cached blocks. This dynamic behavior gives Spark better execution reliability under memory pressure.

Storage and Execution memory can borrow from each other

Storage can borrow from Execution: When execution memory is unused

Execution can borrow from Storage: When storage memory is unused

Eviction Policy: Storage memory can be evicted if execution needs space, but not vice versa


    spark.sql.fraction = 0.8 (default) // Fraction of Spark memory reserved for storage
    spark.sql.storageFraction = 0.5 (default) // Memory fraction that must remain after eviction
    spark.storage.storageFraction = 0.5 (default)

Overview

Tungsten is Apache Sparkโ€™s physical execution engine, introduced in Spark 1.4 as part of Project Tungsten. It focuses on optimizing CPU and memory efficiency, enabling Spark to run faster and with reduced GC overhead. Tungsten operates on the physical plan generated by the Catalyst optimizer and is the core engine that executes the plan using low-level system optimizations.


๐Ÿ” Quick Clarification:

โœ… Code generation (aka whole-stage codegen) is part of the Tungsten physical execution engineโ€”not Catalyst.

โŒ Catalyst does not do code generationโ€”it stops at generating the physical plan.


Key Capabilities of Tungsten


โœ… Whole-Stage Code Generation

  Dynamically generates Java bytecode at runtime to process multiple operations in a single function.

  Eliminates function call overhead and intermediate row objects.

  Improves CPU cache locality.


โœ… Binary Memory Layout (UnsafeRow)

  Uses a compact binary format to represent rows and columns.

  Reduces memory overhead and speeds up access.

  Enables efficient serialization/deserialization and sorting.


โœ… Cache-Aware Computation

  Algorithms are optimized to operate efficiently within CPU cache sizes.

  Reduces memory shuffling and unnecessary access to slower memory.


โœ… Manual Memory Management

  Spark gains control over how memory is allocated and released.

  Operates on both on-heap and off-heap memory depending on configuration.


โœ… Tungsten operates with or without off-heap memory, but its performance is enhanced when off-heap is enabled.

  By default, Tungsten uses on-heap memory.

Spill buffers refer to temporary in-memory data buffers that are used during expensive operations like:


  Sorts

  Aggregations

  Shuffles

  Joins


When the executor's memory runs low, Spark spills these buffers to disk to avoid out-of-memory errors.

When you submit a SQL query or use the DataFrame API, Spark doesn't just run it blindly. Instead, it goes through several intelligent steps under the Catalyst Optimizer to analyze, optimize, and execute your logic efficiently.

Let's walk through each stage, combining functionality and real behavior:


๐ŸŸซ 1. SQL Query / DataFrame

You write: SELECT name FROM employees WHERE age > 30 or a similar DataFrame query.

This is the starting point โ€” Spark simply takes your instructions as input.


๐ŸŸฉ 2. Unresolved Logical Plan

What happens:

Spark creates an initial "blueprint" of your query โ€” it knows the operations (select, filter), but hasn't confirmed if columns or tables exist.

'Project ['name]
+- 'Filter ('age > 30)
   +- 'UnresolvedRelation [employees]

It's called unresolved because:

  It hasn't looked up table schemas.

  It doesn't know if employees or name actually exist.

Purpose: Act as an abstract representation of what you want, without verification.


๐ŸŸจ 3. Logical Plan (After Analysis using Catalog)

What happens:

Spark checks the metadata catalog to resolve all columns, tables, and types.

It ensures everything exists and that operations like filtering and joining make sense.

Project [name#1]
+- Filter (age#2 > 30)
   +- Relation employees[id#0, name#1, age#2]

Purpose:

  Validates the query (semantic checks).

  Resolves all references.

  Applies type-checking.

This is where Spark throws errors if column/table names are wrong.


๐Ÿ”ท 4. Optimized Logical Plan

What happens: Spark applies rule-based transformations to improve performance through a Rule-Based Optimizer (RBO):

Key Rules Applied:

  Filter pushdown - move WHERE clause as early as possible

  Constant folding - compute static expressions in advance

  Column pruning - drop unused columns

  Join reordering and elimination - optimize join order and remove unnecessary joins

  Boolean expression simplification - simplify AND/OR conditions

  Null propagation - handle null values efficiently

  Common subexpression elimination - avoid redundant calculations

Rule Engine:

  Rules are applied iteratively until no more improvements possible

  Each rule is a pattern-matching transformation

  Rules can be custom-defined and plugged into Catalyst

Purpose: Improve efficiency without altering the final result. Still doesn't specify how to execute the plan.


๐ŸŸซ 5. Physical Plans

What happens:

Spark now generates multiple execution strategies.

It considers how best to perform joins, aggregations, scans, etc. depending on data size and layout.

Examples:

  Should it use a broadcast join or a sort-merge join?

  Should it cache intermediate data?

Purpose: Translate logical operations into concrete steps that can run on a cluster.


๐ŸŸฆ 6. Cost Model

What happens:

Spark estimates the cost of each physical plan:

  Based on I/O, CPU, memory usage, shuffles, and partition sizes.

  Uses table statistics and column histograms for better estimates.

It compares all options and chooses the lowest-cost plan.

Purpose: Select the most efficient execution path.


๐ŸŸช 7. Selected Physical Plan

What happens: Spark now has a final physical plan ready to run.

Includes exact execution instructions:

  How to read data.

  When to shuffle.

  Partitioning schemes.

  Specific join strategies.

Purpose: Provide a complete, optimized execution roadmap for the engine.


๐ŸŸฅ 8. Code Generation โ†’ RDDs

What happens:

Spark uses Tungsten engine to generate low-level JVM bytecode (Whole-Stage Code Generation).

The physical plan is turned into actual operations on RDDs.

These RDDs are then executed across the distributed cluster.

Purpose: Achieve high performance execution.


Error Handling

How Catalyst handles optimization failures and fallbacks:

Analysis Phase Errors:

  Unresolved references โ†’ Clear error messages with suggestions

  Type mismatches โ†’ Detailed type conflict information

  Schema validation failures โ†’ Points to exact column/table issues

Optimization Fallbacks:

  If code generation fails โ†’ Falls back to interpreted execution

  Complex expressions that can't be optimized โ†’ Uses original form

  Statistics missing โ†’ Uses default cost estimates

  Rule application failures โ†’ Skips problematic rules, continues with others

Runtime Adaptability:

  AQE monitors execution and re-optimizes if initial plan performs poorly

  Graceful degradation when optimizations don't work as expected


Partition Pruning

Skips reading unnecessary partitions based on filter conditions.

Example:

-- Table partitioned by year and month
SELECT * FROM sales WHERE year = 2024 AND month = 'January'

Only reads partitions matching year=2024/month=January

Avoids scanning other partition directories

Dramatically reduces I/O for large datasets


Constant Folding

Evaluates constant expressions at compile time instead of runtime.

Example:

SELECT price * (1 + 0.08), 
       substr('Hello World', 1, 5),
       date_add('2024-01-01', 30)
FROM products

Optimized to:

SELECT price * 1.08,
       'Hello',
       '2024-01-31'
FROM products

1 + 0.08 becomes 1.08

String and date functions with constants get pre-computed

Eliminates redundant calculations per row


Predicate Pushdown

Moves filter conditions closer to the data source to reduce data movement.

Example:

SELECT name FROM (SELECT * FROM users JOIN orders ON users.id = orders.user_id) 
WHERE age > 25

Optimized to:

SELECT name FROM (SELECT * FROM users WHERE age > 25) u 
JOIN orders ON u.id = orders.user_id

Filter age > 25 applied before the expensive JOIN

Reduces rows processed in subsequent operations

Works with file formats (Parquet) and databases

Physical Plan Phase

What Happens: The physical planner takes the optimized logical plan and generates one or more physical execution strategies, then selects the best one based on cost estimates.


Key Transformations:

Operator Mapping:

Logical Join becomes SortMergeJoin, BroadcastHashJoin, or ShuffledHashJoin

Logical Aggregate becomes HashAggregate or SortAggregate

Logical Project becomes Project with codegen-ready expressions


Physical Strategies:

Cost-based selection: Chooses join algorithms based on table sizes and statistics

Partitioning decisions: Determines shuffle requirements and partition counts

Memory management: Allocates memory for operations like hash tables and sort buffers


Execution Planning:

Determines which operations can be pipelined together

Identifies shuffle boundaries (stage breaks)

Plans resource allocation and task distribution


Code Generation Prep:

Expressions get prepared for whole-stage code generation

Complex expression trees become Java code snippets

Operators get grouped into stages for codegen


Output: The result is an executable SparkPlan tree with concrete operators, memory allocations, and codegen-ready components that can be submitted to the cluster for execution.

Catalyst enables Tungsten optimizations during the physical planning phase.


How Catalyst Enables Tungsten

Code Generation Phase:

Catalyst's physical planner prepares expressions for Tungsten's code generation     

Converts expression trees into codegen-ready formats     

Enables whole-stage code generation by identifying fusible operators     

Code generation (codegen) in Spark is a performance optimization that generates optimized Java bytecode at runtime instead of using interpreted execution.     


Tungsten's Code Generation

What Tungsten Generates:

Java source code at runtime that gets compiled to bytecode     

Whole-stage codegen - fuses multiple operators into single functions     

Expression codegen - optimized evaluation of SQL expressions     


Specific Integration Points:

Expression Evaluation:

Catalyst optimizes expressions logically     

Passes optimized expressions to Tungsten for code generation     

Tungsten generates efficient bytecode for those expressions     


Operator Selection:

Catalyst chooses logical operators (Join, Aggregate)     

Physical planner selects Tungsten-optimized implementations     

Examples: UnsafeShuffleWriter, TungstenAggregate     


Memory Management:

Catalyst plans memory usage patterns     

Tungsten provides the actual off-heap memory management     

Uses binary formats determined during physical planning     

๐Ÿ“Œ File Formats in Spark โ€“ Complete Notes

(Parquet โ€ข ORC โ€ข Avro โ€ข JSON โ€ข CSV โ€ข Delta โ€ข Text)


๐Ÿ”ถ 1. CSV (Comma Separated Values)

๐Ÿ“ What it is

Plain text tabular format where values are separated using comma.     


โœ” Advantages

Human readable    

Universally supported    

Easy ingestion    


โœ– Disadvantages

No schema    

No compression by default    

Slow scanning    

Heavy disk I/O    


Spark impact

Slow query performance    

High shuffle    

Needs .option("header") frequently    

Schema inference required    


Use when

Interchange format    

Small static files    

System integration    



๐Ÿ”ถ 2. JSON (JavaScript Object Notation)

๐Ÿ“ What it is

Semi-structured format storing data as key-value & nested objects.    


โœ” Advantages

Schemaless    

Supports nested structures    

Human readable    


โœ– Disadvantages

Larger size    

Slow parsing    

No indexing    


Spark impact

slower scan    

schema inference needed    

expensive processing for nested fields    


Use when

APIs    

Logs    

Streaming    



๐Ÿ”ถ 3. Parquet (Columnar)

๐Ÿ“ What it is

Columnar storage format highly optimized for analytics.    


โœ” Advantages

Compression    

Predicate pushdown    

Column pruning    

Fast read    

Optimized for Spark    


โœ– Disadvantages

Insert slow    

Update slow    

Overhead for small rows    


Spark benefits

fast scan    

low shuffle    

optimized for big data    

best for transformation    


Use when

Data lake    

analytical workloads    



๐Ÿ”ถ 4. ORC (Optimized Row Columnar)

๐Ÿ“ What

Similar to Parquet but optimized originally for Hive.    


โœ” Advantages

Best compression    

Good query optimization in Hive    

Indexing + dictionary encoding    


โœ– Disadvantages

Heavy metadata    

Slightly slower in Spark than Parquet (Spark optimized for Parquet)    


Spark impact

very good compression    

better for Hive workloads    


Use when

Hive heavy systems    

Hadoop environment    



๐Ÿ”ถ 5. Avro (Row Oriented)

๐Ÿ“ What

Row-based binary format with schema stored along with data.    


โœ” Advantages

Schema evolution    

Row based writing    

Compact    

Good for messaging    


โœ– Disadvantages

Not optimized for analytics    

Slow column scanning    


Spark use

Kafka + Streaming    

CDC data    

schema evolution use cases    


Use when

streaming pipelines    

row-based messaging    



๐Ÿ”ถ 6. Delta Lake

๐Ÿ“ What it is

Storage + transaction layer on top of parquet.    


โœ” Advantages

ACID transactions    

Time travel    

MERGE / UPDATE / DELETE    

Streaming + batch unified    

Schema evolution    


โœ– Disadvantages

Proprietary (open source, but Databricks led)    

Slight storage overhead    


Spark impact

Fast    

Highly reliable    

Supports ACID    

Lakehouse recommended format    


Use when

Data lakehouse    

UPSERTS    

Slowly changing dimensions    

Streaming + batch    



๐Ÿ”ถ 7. TEXT

โœ” Advantages

simplest format    

widely supported    


โœ– Disadvantages

no schema    

no compression    

poor performance    


Spark use

quick testing    

minimal ingestion    

logs    



โšก Spark Performance Comparison

Format    Storage    Read           Compression    Analytics    Schema Evolution
CSV       poor       slow           none           bad          none
JSON      medium     slow           medium         bad          medium
Avro      medium     fast row       medium         medium       best
Parquet   best       fast column    best           best         good
ORC       best       fast           best           best         good
Delta     best