Lazy Evaluation in Spark

 

Lazy evaluation is Spark's fundamental design choice where transformations are recorded but not executed until an action forces execution, enabling global query optimization and preventing unnecessary computation—understanding lazy evaluation is crucial for writing efficient Spark code and avoiding common pitfalls like redundant transformations—without lazy evaluation, Spark would be dramatically slower and unable to optimize across your entire code.

 

What is Lazy Evaluation?

The Basic Concept

Lazy evaluation means: Spark records what you want to do, but doesn't do it until you ask.

Eager evaluation (contrast) means: Do it immediately.

Example:

# LAZY – These lines don't execute anything

df = spark.read .parquet (" data.parquet ")

df = df.filter ( df.age > 25)

df = df.select ("name", "age")

df = df.groupBy ("city" ).agg ({"age": " avg "})

 

# EAGER – This line forces execution

result = df.show ()

Only the .show () action forces execution. All the transformations before it are just recorded.

 

Why Lazy Evaluation?

Reason 1: Global Optimization

With lazy evaluation, Spark can see the entire operation chain and optimize it as a whole.

Example: Push-down filtering

Without lazy evaluation (eager):

Read 10GB parquet file → 10GB in memory

Filter age > 25 → 5GB remain

Select columns → 2GB (smaller columns)

GroupBy city → 1GB (aggregated)

You read unnecessary data and do unnecessary filtering early on.

With lazy evaluation (Spark's approach):

Spark sees: Read → Filter → Select → GroupBy

Catalyst optimizes: Push filter to read phase

Result: Read only filtered data

 

Read 5GB (filtered) parquet file → 5GB in memory (50% savings!)

Select columns → 2GB

GroupBy city → 1GB

Same result, but 50% less data movement.

 

Reason 2: Preventing Redundant Computation

Without lazy evaluation:

df = spark.read .parquet (" data.parquet ") # Read happens immediately

df2 = df.filter (…) # Filter happens immediately

df3 = df2.groupBy (…).count() # GroupBy happens immediately

df4 = df.filter (…).select (…) # Read happens AGAIN! ← Redundant

With lazy evaluation:

df = spark.read .parquet (" data.parquet ") # Recorded

df2 = df.filter (…) # Recorded

df3 = df2.groupBy (…).count() # Recorded

df4 = df.filter (…).select (…) # Recorded

 

result1 = df3.show () # Execution 1: Read → Filter → GroupBy

result2 = df4.show () # Execution 2: Read → Filter → Select

# Each executed separately when needed

Reason 3: Adaptive Query Execution

Spark can gather statistics during execution and adapt the remaining plan.

Initial Plan: Join A with B using Hash Join

Execution starts…

After reading A: "Hmm, A is only 10MB"

Adaptive decision: "Use Broadcast Join instead (faster for small A)"

Continue with optimized strategy

This would be impossible with eager evaluation because the first operation already executed.

 

Transformations vs Actions

Transformations (Lazy)

Operations that return a DataFrame /RDD:

df.filter () # Returns DataFrame

df.select () # Returns DataFrame

df.groupBy () # Returns GroupedData

df.map ( ) # Returns RDD

df.join () # Returns DataFrame

df.union () # Returns DataFrame

df.distinct () # Returns DataFrame

Key characteristic: Nothing executes when you call these.

 

Actions (Eager)

Operations that return a result or write to storage:

df.show () # Returns None (but prints to console)

df.count () # Returns integer

df.collect () # Returns list of rows

df.take ( n) # Returns n rows

df.first () # Returns first row

df.write .parquet ( ) # Returns None (but writes file)

df.rdd.collect () # Returns list

df.foreach () # Returns None (but does side-effect)

df.saveAsTable () # Returns None (but saves table)

Key characteristic: Execution happens immediately when you call these.

 

How Lazy Evaluation Works Internally

When you write lazy code:

df = spark.read .parquet (" data.parquet ")

df = df.filter ( df.age > 25)

df = df.groupBy ("city" ).count ()

Here's what happens internally:

Step 1: df = spark.read .parquet (…)

└─ Creates a LogicalPlan object

└─ LogicalPlan.read (" data.parquet ")

└─ No execution (returns immediately)

 

Step 2: df = df.filter ( df.age > 25)

└─ Modifies LogicalPlan

└─ LogicalPlan.read (" data.parquet " ).filter (age > 25)

└─ No execution (returns immediately)

 

Step 3: df = df.groupBy ("city" ).count ()

└─ Modifies LogicalPlan

└─ LogicalPlan.read (" data.parquet " ).filter (age > 25 ).groupBy ("city" ).count ()

└─ No execution (returns immediately)

 

Step 4: df.show () [ ACTION!]

└─ show( ) is an action

└─ Triggers execution!

└─ Spark runs entire LogicalPlan :

Read → Filter → GroupBy → Count → Display

 

The Danger of Lazy Evaluation: Common Mistakes

Mistake 1: Silent Errors

Problem: Errors only happen when action executes, not when code runs.

df = spark.read .parquet (" data.parquet ")

df = df.filter ( df.nonexistent _column > 25 ) # No error yet!

df = df.show () # ERROR happens HERE!

The error about " nonexistent_column " doesn't occur until .show () executes.

 

Mistake 2: Redundant Transformations

Problem: Not realizing transformations don't execute, so you repeat them.

df = spark.read .parquet (" data.parquet ")

df = df.filter ( df.age > 25)

 

# Later in code:

result1 = df.select ("name" ).show ()

result2 = df.select ("age" ).show() # Reads and filters TWICE!

Both .show () calls execute the entire chain (read + filter + select).

Fix: Cache intermediate results if reusing

df = spark.read .parquet (" data.parquet " ).cache ()

df = df.filter ( df.age > 25 ).cache ()

 

result1 = df.select ("name" ).show() # Executes once

result2 = df.select ("age" ).show() # Uses cached data

 

Mistake 3: Debugging is Confusing

Problem: print( ) statements don't work as expected with lazy evaluation.

df = spark.read .parquet (" data.parquet ")

print( df ) # Prints: DataFrame [ name: string, age: int]

# NOT the actual data!

 

df = df.filter ( df.age > 25)

print( df ) # Still prints: DataFrame [ name: string, age: int]

# No filtering information!

 

# Correct way to inspect data:

df.show () # Actually shows data

df.printSchema () # Shows column structure

 

Forcing Execution: When to Use Actions

.show () – View first rows

df.show () # Shows first 20 rows

df.show (100) # Shows first 100 rows

Good for: Quick inspection, debugging

 

.count () – Get total row count

Good for: Finding data volume, validation

count = df.count () # Executes entire chain, returns count

Warning: Expensive for large datasets (must read all data)

 

.collect () – Bring all data to Driver

results = df.collect () # Returns list of all rows

for row in results:

print(row)

Good for: Small result sets only

Danger: If result > Driver memory, application crashes

 

.write .parquet ( ) – Write to storage

df.write .parquet ("output/path")

Good for: Saving results

Best practice: Always write to storage, don't collect

 

.take (n) – Get first n rows

first_10 = df.take (10 ) # Returns first 10 rows only

Good for: Sampling, quick checks

Better than : .collect () for large datasets

 

Performance Implications

Lazy evaluation enables these optimizations:

Your Code:

df.filter ( df.age > 25)

.filter ( df.city == "NYC ")

.select ("name", "age ")

.show ()

 

Catalyst Optimizes:

1. Combine filters: AND them together (one pass instead of two)

2. Push selection: Only read "name" and "age" columns from source

3. Push filtering: Filter at read time (less data in memory)

 

Result: Much faster execution!

 

Caching and Lazy Evaluation

Without caching:

df = spark.read .parquet (" data.parquet ")

df_filtered = df.filter ( df.age > 25)

 

result1 = df_ filtered.groupBy ("city" ).count ( ).show() # Reads + filters

result2 = df_ filtered.select ("name" ).show() # Reads + filters AGAIN!

With caching:

df = spark.read .parquet (" data.parquet ")

df_filtered = df.filter ( df.age > 25 ).cache ()

 

result1 = df_ filtered.groupBy ("city" ).count ( ).show() # Reads + filters + caches

result2 = df_ filtered.select ("name" ).show() # Uses cached data (fast!)

Rule: Cache if you'll use the same DataFrame multiple times.

💡 Did You Know?

  • No computation until action: You can build arbitrarily complex chains, no execution
  • LogicalPlans are lightweight: Recording 1000 transformations takes negligible time
  • Catalyst is smart: Rewrites entire plans based on data characteristics
  • Errors happen late: Test your code thoroughly with .show () or .count ()
  • Spark is truly lazy: Even reading files is lazy (Spark reads first partition on demand)
  •  

    📚 Study Notes

  • Lazy evaluation: Record operations, execute when action called
  • Transformations: Lazy (filter, select, groupBy , map, join, etc.)
  • Actions: Eager (show, count, collect, take, write, etc.)
  • LogicalPlan : Internal representation of transformation chain
  • Catalyst: Optimizer that rewrites logical plans efficiently
  • Global optimization: See entire chain, optimize across operations
  • Push-down filtering: Filters pushed to read phase (huge speedup)
  • Adaptive execution: Adapt remaining plan based on statistics
  • Caching: Save intermediate results if reusing
  • Danger: Errors only happen at action execution time
  • Efficiency: Prevents redundant computation, enables massive optimization
  • Debugging: Use .show () and .printSchema () to inspect data
  •  

    Leave a Reply