Lazy Evaluation in Spark
Lazy evaluation is Spark's fundamental design choice where transformations are recorded but not executed until an action forces execution, enabling global query optimization and preventing unnecessary computation—understanding lazy evaluation is crucial for writing efficient Spark code and avoiding common pitfalls like redundant transformations—without lazy evaluation, Spark would be dramatically slower and unable to optimize across your entire code.
What is Lazy Evaluation?
The Basic Concept
Lazy evaluation means: Spark records what you want to do, but doesn't do it until you ask.
Eager evaluation (contrast) means: Do it immediately.
Example:
# LAZY – These lines don't execute anything
df = spark.read .parquet (" data.parquet ")
df = df.filter ( df.age > 25)
df = df.select ("name", "age")
df = df.groupBy ("city" ).agg ({"age": " avg "})
# EAGER – This line forces execution
result = df.show ()
Only the .show () action forces execution. All the transformations before it are just recorded.
Why Lazy Evaluation?
Reason 1: Global Optimization
With lazy evaluation, Spark can see the entire operation chain and optimize it as a whole.
Example: Push-down filtering
Without lazy evaluation (eager):
Read 10GB parquet file → 10GB in memory
Filter age > 25 → 5GB remain
Select columns → 2GB (smaller columns)
GroupBy city → 1GB (aggregated)
You read unnecessary data and do unnecessary filtering early on.
With lazy evaluation (Spark's approach):
Spark sees: Read → Filter → Select → GroupBy
Catalyst optimizes: Push filter to read phase
Result: Read only filtered data
Read 5GB (filtered) parquet file → 5GB in memory (50% savings!)
Select columns → 2GB
GroupBy city → 1GB
Same result, but 50% less data movement.
Reason 2: Preventing Redundant Computation
Without lazy evaluation:
df = spark.read .parquet (" data.parquet ") # Read happens immediately
df2 = df.filter (…) # Filter happens immediately
df3 = df2.groupBy (…).count() # GroupBy happens immediately
df4 = df.filter (…).select (…) # Read happens AGAIN! ← Redundant
With lazy evaluation:
df = spark.read .parquet (" data.parquet ") # Recorded
df2 = df.filter (…) # Recorded
df3 = df2.groupBy (…).count() # Recorded
df4 = df.filter (…).select (…) # Recorded
result1 = df3.show () # Execution 1: Read → Filter → GroupBy
result2 = df4.show () # Execution 2: Read → Filter → Select
# Each executed separately when needed
Reason 3: Adaptive Query Execution
Spark can gather statistics during execution and adapt the remaining plan.
Initial Plan: Join A with B using Hash Join
Execution starts…
After reading A: "Hmm, A is only 10MB"
Adaptive decision: "Use Broadcast Join instead (faster for small A)"
Continue with optimized strategy
This would be impossible with eager evaluation because the first operation already executed.
Transformations vs Actions
Transformations (Lazy)
Operations that return a DataFrame /RDD:
df.filter () # Returns DataFrame
df.select () # Returns DataFrame
df.groupBy () # Returns GroupedData
df.map ( ) # Returns RDD
df.join () # Returns DataFrame
df.union () # Returns DataFrame
df.distinct () # Returns DataFrame
Key characteristic: Nothing executes when you call these.
Actions (Eager)
Operations that return a result or write to storage:
df.show () # Returns None (but prints to console)
df.count () # Returns integer
df.collect () # Returns list of rows
df.take ( n) # Returns n rows
df.first () # Returns first row
df.write .parquet ( ) # Returns None (but writes file)
df.rdd.collect () # Returns list
df.foreach () # Returns None (but does side-effect)
df.saveAsTable () # Returns None (but saves table)
Key characteristic: Execution happens immediately when you call these.
How Lazy Evaluation Works Internally
When you write lazy code:
df = spark.read .parquet (" data.parquet ")
df = df.filter ( df.age > 25)
df = df.groupBy ("city" ).count ()
Here's what happens internally:
Step 1: df = spark.read .parquet (…)
└─ Creates a LogicalPlan object
└─ LogicalPlan.read (" data.parquet ")
└─ No execution (returns immediately)
Step 2: df = df.filter ( df.age > 25)
└─ Modifies LogicalPlan
└─ LogicalPlan.read (" data.parquet " ).filter (age > 25)
└─ No execution (returns immediately)
Step 3: df = df.groupBy ("city" ).count ()
└─ Modifies LogicalPlan
└─ LogicalPlan.read (" data.parquet " ).filter (age > 25 ).groupBy ("city" ).count ()
└─ No execution (returns immediately)
Step 4: df.show () [ ACTION!]
└─ show( ) is an action
└─ Triggers execution!
└─ Spark runs entire LogicalPlan :
Read → Filter → GroupBy → Count → Display
The Danger of Lazy Evaluation: Common Mistakes
Mistake 1: Silent Errors
Problem: Errors only happen when action executes, not when code runs.
df = spark.read .parquet (" data.parquet ")
df = df.filter ( df.nonexistent _column > 25 ) # No error yet!
df = df.show () # ERROR happens HERE!
The error about " nonexistent_column " doesn't occur until .show () executes.
Mistake 2: Redundant Transformations
Problem: Not realizing transformations don't execute, so you repeat them.
df = spark.read .parquet (" data.parquet ")
df = df.filter ( df.age > 25)
# Later in code:
result1 = df.select ("name" ).show ()
result2 = df.select ("age" ).show() # Reads and filters TWICE!
Both .show () calls execute the entire chain (read + filter + select).
Fix: Cache intermediate results if reusing
df = spark.read .parquet (" data.parquet " ).cache ()
df = df.filter ( df.age > 25 ).cache ()
result1 = df.select ("name" ).show() # Executes once
result2 = df.select ("age" ).show() # Uses cached data
Mistake 3: Debugging is Confusing
Problem: print( ) statements don't work as expected with lazy evaluation.
df = spark.read .parquet (" data.parquet ")
print( df ) # Prints: DataFrame [ name: string, age: int]
# NOT the actual data!
df = df.filter ( df.age > 25)
print( df ) # Still prints: DataFrame [ name: string, age: int]
# No filtering information!
# Correct way to inspect data:
df.show () # Actually shows data
df.printSchema () # Shows column structure
Forcing Execution: When to Use Actions
.show () – View first rows
df.show () # Shows first 20 rows
df.show (100) # Shows first 100 rows
Good for: Quick inspection, debugging
.count () – Get total row count
Good for: Finding data volume, validation
count = df.count () # Executes entire chain, returns count
Warning: Expensive for large datasets (must read all data)
.collect () – Bring all data to Driver
results = df.collect () # Returns list of all rows
for row in results:
print(row)
Good for: Small result sets only
Danger: If result > Driver memory, application crashes
.write .parquet ( ) – Write to storage
df.write .parquet ("output/path")
Good for: Saving results
Best practice: Always write to storage, don't collect
.take (n) – Get first n rows
first_10 = df.take (10 ) # Returns first 10 rows only
Good for: Sampling, quick checks
Better than : .collect () for large datasets
Performance Implications
Lazy evaluation enables these optimizations:
Your Code:
df.filter ( df.age > 25)
.filter ( df.city == "NYC ")
.select ("name", "age ")
.show ()
Catalyst Optimizes:
1. Combine filters: AND them together (one pass instead of two)
2. Push selection: Only read "name" and "age" columns from source
3. Push filtering: Filter at read time (less data in memory)
Result: Much faster execution!
Caching and Lazy Evaluation
Without caching:
df = spark.read .parquet (" data.parquet ")
df_filtered = df.filter ( df.age > 25)
result1 = df_ filtered.groupBy ("city" ).count ( ).show() # Reads + filters
result2 = df_ filtered.select ("name" ).show() # Reads + filters AGAIN!
With caching:
df = spark.read .parquet (" data.parquet ")
df_filtered = df.filter ( df.age > 25 ).cache ()
result1 = df_ filtered.groupBy ("city" ).count ( ).show() # Reads + filters + caches
result2 = df_ filtered.select ("name" ).show() # Uses cached data (fast!)
Rule: Cache if you'll use the same DataFrame multiple times.
💡 Did You Know?
📚 Study Notes