DAG Directed Acyclic Graph

 

The DAG (Directed Acyclic Graph) is Spark's internal representation of your computation: a visual diagram showing all transformations and their dependencies, with nodes representing RDD/ DataFrame operations and edges representing data flow—understanding DAGs is essential for optimizing performance, debugging slow jobs, and grasping how Spark translates your code into parallel execution—the DAG is the bridge between your code and the actual cluster work.

 

What is a DAG?

Definition

A DAG (Directed Acyclic Graph) is a mathematical graph structure where:

  • Nodes represent data (RDDs, DataFrames ) or operations
  • Edges represent dependencies and data flow
  • Directed means arrows show direction of computation
  • Acyclic means no loops (computation is linear)
  • Your Code :

    df1 = spark .read .parquet ( " data.parquet " )

    df2 = df1 .filter ( df1 .age > 25 )

    df3 = df2 .groupBy ( "city" ).count ()

    df3 .show ()

     

    DAG Representation :

    ┌─────────────┐

    │   PARQUET   │

      ( Source )  │

    └──────┬──────┘

           │

           ↓

    ┌─────────────────┐

    │  FILTER         │

    │  ( age > 25 )     │

    └──────┬──────────┘

           │

           ↓

    ┌──────────────────┐

    │ GROUPBY + COUNT  │

    ( by city )        │

    └──────┬───────────┘

           │

           ↓

    ┌──────────────────┐

    │  SHOW ( Action )  

    └──────────────────┘

     

    RDD Lineage vs DAG

    RDD Lineage

    Each RDD (Resilient Distributed Dataset) remembers how it was created:

    rdd1 = sc .textFile ( "data.txt " )      # RDD from file

    rdd2 = rdd1 .map ( lambda x : x .upper ()) # RDD from transformation

    rdd3 = rdd2 .filter ( lambda x : len ( x ) > 10 )  # RDD from filter

     

    rdd3 .collect ()  # Triggers execution based on lineage

    Lineage chain:

    textFile ( data .txt ) map ( upper ) filter ( len > 10 ) collect ( )

    If an RDD is lost, Spark recomputes it from the original source using lineage.

     

    DAG vs RDD Lineage

    RDD Lineage: Tracks individual RDDs and how each was created

    DAG: Complete graph showing all operations and dependencies for entire job

    DAG includes:

  • All RDDs in the job
  • All transformations
  • Shuffle boundaries (where stages divide)
  • Complete dependency structure
  •  

    DAG Creation Process

    When you call an action:

    1. PARSE CODE

       └─ Convert Python/Scala/SQL to internal operations

     

    2. CREATE LOGICAL PLAN

       └─ List of operations in order

       └─ Read → Filter → GroupBy → Count

     

    3. CREATE RDD LINEAGE

       └─ Each operation creates an RDD

       └─ Track parent-child relationships

     

    4. OPTIMIZE (Catalyst)

       └─ Reorder operations

       └─ Combine compatible operations

       └─ Push down filters

     

    5. CREATE DAG

       └─ Visualize entire computation graph

       └─ Show all dependencies

       └─ Identify stage boundaries

     

    6. BREAK INTO STAGES

       └─ Group operations before shuffle

       └─ Separate stages after shuffle

     

    7. CREATE TASKS

       └─ One task per partition per stage

       └─ Assign to executors

     

    8. EXECUTE

       └─ Run tasks in dependency order

     

    DAG Example: Real Application

    Your code:

    df = spark .read .parquet ( "s3://bucket/ events.parquet " )

     

    # Branch 1

    branch1 = df .filter ( df .type == "click" )

    count1 = branch1 .count ()

    print ( f "Clicks : { count1 } " )

     

    # Branch 2

    branch2 = df .filter ( df .type == "impression" )

    count2 = branch2 .count ()

    print ( f "Impressions : { count2 } " )

     

    # Both branches derived from same source

    DAG representation:

             ┌─────────────────┐

             │   PARQUET       │

             │   ( events .parquet )

             └────────┬────────┘

                      │

              ┌───────┴───────┐

              ↓               ↓

        ┌──────────┐    ┌──────────┐

        │ FILTER   │    │ FILTER   │

        │ type = click│    │ type = imp  │

        └────┬─────┘    └────┬─────┘

             │               │

             ↓               ↓

        ┌──────────┐    ┌──────────┐

        │ COUNT    │    │ COUNT    │

        └────┬─────┘    └────┬─────┘

             │               │

        ┌────┴───────────────┴────┐

        │  Results collected to    │

        │  Driver and printed      │

        └──────────────────────────┘

    Key insight: Both branches reuse the same source RDD. Spark creates the PARQUET RDD once, then reuses it.

     

    Narrow vs Wide Transformations in DAGs

    Narrow Transformations (Same Stage)

    Operations where each output partition depends on only one input partition:

    df .filter ()      # Each output partition depends on one input

    df .map ( )         # Each output depends on one input

    df .select ()      # Each output depends on one input

    df .flatMap ()     # Each output depends on one input

     

     

    In DAG: All in same stage, no shuffle needed.

    Input Partition 1 → Filter → Output Partition 1

    Input Partition 2 → Filter → Output Partition 2

    Input Partition 3 → Filter → Output Partition 3

    ( no data movement between partitions )

     

     

     

    Wide Transformations (Multiple Stages)

    Operations where output partitions depend on multiple input partitions:

    df .groupBy ()     # Must shuffle all matching keys together

    df .join ()        # Must shuffle to co-locate matching keys

    df .distinct ()    # Must shuffle to find duplicates

    df .repartition () # Shuffles data to new partition count

    df .sortBy ()      # Shuffles to sort across partitions

     

     

    In DAG: Creates stage boundary, shuffle required.

    [STAGE 1: Shuffle Write]

    Input Partitions → Shuffle Write to files

     

    [Network: Move data]

     

    [STAGE 2: Shuffle Read + Operation]

    Shuffle files → Read + GroupBy / Join / etc .

     

     

     

    Stage Boundaries in DAG

    df = spark .read .parquet ( " data.parquet " )

    df = df .filter ( df .age > 25 )              # Narrow

    df = df .groupBy ( "city" ).count ()          # Wide (shuffle!)

    df = df .filter ( df [ "count" ] > 100 )        # Narrow

    df .write .parquet ( "output" )

     

     

    DAG with stages:

    STAGE 0 (Read + Narrow transforms)

    ├─ Read parquet

    ├─ Filter age > 25

    └─ [Output: 100 partitions]

     

    [SHUFFLE BOUNDARY]

     

    STAGE 1 (Shuffle + Aggregation)

    ├─ GroupBy city (shuffle read)

    ├─ Count

    └─ [Output: 200 partitions]

     

    [No shuffle for next operation]

     

    STAGE 2 ( Post-aggregation filter)

    ├─ Filter count > 100

    └─ [Output: 200 partitions]

     

    [Write operation]

     

    Write parquet to output

     

     

     

    Viewing DAGs: Spark UI

    The Spark Web UI visualizes DAGs:

    http :// driver hostname:4040

     

    Click on a Job → See DAG visualization

    The UI shows:

  • Rectangular boxes: RDD/ DataFrame operations
  • Boxes inside boxes: Stage boundaries
  • Blue boxes: Shuffle write stages
  • Orange boxes: Shuffle read stages
  • Arrow: Data dependency
  •  

    Example DAG visualization in UI:

    [Read Parquet]

    DAG Optimization by Catalyst

    Catalyst optimizer rewrites DAGs:

    Original DAG (your code):

    Read → Filter A → Select → Filter B → GroupBy

     

     

    Optimized DAG (what Catalyst creates):

    Read → [ Combine Filter A + B ] → Select → GroupBy

           ( one pass instead of two )

     

     

    Or even better:

    Read ( with Filter A + B pushed to read ) → Select → GroupBy

    ( read only filtered data from source )

     

     

     

    DAG Execution: How Stages Execute

    Once DAG is created and split into stages:

    DAG with 3 stages:

     

    STAGE 0        STAGE 1        STAGE 2

    ( Read )         ( GroupBy )      ( Write )

      │              │              │

      ├─Task 0    ┌──┴─Task 4    ┌──┴─Task 7

      ├─Task 1    │  ─ Task 5      │  ─ Task 8

      ├─Task 2    │  ─ Task 6      │  ─ Task 9

      └─Task 3    └─────────────────┘

     

    Execution Order :

    1 . Execute all Stage 0 tasks in parallel ( read + filter )

    2 . WAIT for all Stage 0 tasks to complete

    3 . Shuffle occurs ( data movement )

    4 . Execute all Stage 1 tasks in parallel ( groupby + count )

    5 . WAIT for all Stage 1 tasks to complete

    6 . Execute all Stage 2 tasks in parallel ( write )

    Key: All tasks in a stage must complete before next stage begins.

     

    Understanding DAG Complexity

    Simple DAG (Linear)

    Read → Filter → Select → GroupBy → Write

    └─ One path , no branching

    └─ Quick optimization , fast execution

     

     

     

    Complex DAG (Multiple Paths)

             ┌─────────────────────┐

             │   Read File         │

             └────────┬────────────┘

                      │

            ┌─────────┴──────────┐

            ↓                    ↓

        ┌────────┐           ┌────────┐

        │ Filter │           │ Select │

        │ A > 25 │           │ Cols   │

        └───┬────┘           └───┬────┘

            │                    │

            ├─ Execution Path 1  │

            │  GroupBy → Join    │

            │  ( for report A)    │

            │                    │

            └─ Execution Path 2  │

               Union ← Join

               ( for report B)

     

     

    More complex DAGs take longer to optimize but offer more optimization opportunities.

     

    Common DAG Anti Patterns

    Pattern1: Multiple Expensive Operations on Same RDD

    Bad :

    df = spark .read .parquet ( "100GB.parquet " )  # Expensive read

    result1 = df .groupBy ( "col1" ).count ()      # Read + groupby

    result2 = df .filter ( df .col > 10 ).count ()  # Read + filter AGAIN!

    result3 = df .join ( other_df ).count ()       # Read + join AGAIN!

     

    Each operation re reads the file !

     

     

    Fix: Cache after expensive operations

    df = spark .read .parquet ( "100GB.parquet" ).cache ()

    result1 = df .groupBy ( "col1" ).count ()

    result2 = df .filter ( df .col > 10 ).count ()

    result3 = df .join ( other_df ).count ()

     

    File read once , cached results reused!

     

     

     

    Pattern2: Unnecessary Shuffles

    Bad :

    df .groupBy ( "col1" ).agg ( )  # Shuffle 1

      .repartition ( 100 )           # Shuffle 2 (unnecessary!)

      .write .parquet ( )

     

     

    Fix: Specify partitions in aggregation

    df .repartition ( 100 , "col1 " )  # Shuffle once, partition for write

      .groupBy ( "col1" ).agg ( )

      .write .parquet ( )

     

     

     

    📚 Study Notes

    •   DAG: Directed Acyclic Graph showing operations and dependencies

    •   Nodes: RDDs, DataFrames , or operations

    •   Edges: Data dependencies and flow

    •   RDD Lineage: How each RDD was created (parent → child)

    •   Narrow transformations: One input partition → one output partition (same stage)

    •   Wide transformations: Multiple input → multiple output (shuffle boundary)

    •   Stage boundaries: Created by shuffle operations ( groupBy , join, distinct, etc.)

    •   Catalyst optimization: Rewrites DAG to minimize data movement

    •   Push-down filtering: Filters moved to read phase

    •   Spark UI: Visualizes DAG for debugging and analysis

    •   Task execution: Tasks in same stage run parallel, stages run sequentially

    •   Caching: Prevents redundant DAG re-execution

    •   Complexity: More complex DAGs have more optimization opportunities

     

     

    Leave a Reply