Stream Processing

 

Stream processing enables real-time analysis of continuous data flows, making it essential for applications requiring immediate insights, alerts, and reactions—unlike batch processing's scheduled approach, stream processing continuously ingests and processes data as it arrives, transforming how organizations respond to events, detect anomalies, and serve personalized experiences—this topic explores stream processing paradigms, architectures, and practical implementations.

 

What is Stream Processing?

Definition and Core Concept

Stream processing is a computing model where:

  • Data arrives continuously in unbounded streams
  • Processing happens in real-time or near-real-time
  • Data is processed as it arrives, not collected first
  • Results are generated immediately or within seconds
  • No defined end point (infinite streams)
  • Key Characteristics

    Characteristic

    Description

    Timing

    Real-time or near-real-time

    Data Flow

    Continuous, unbounded streams

    Latency

    Sub-second to seconds

    Complexity

    Moderate to complex

    Cost

    Higher (always-on infrastructure)

    Technology

    Kafka, Flink, Spark Streaming

     

    Stream Processing Architectures

    Kappa Architecture (Stream-Only)

    Data Source (Kafka Topic)

        ↓

    Stream Processing Engine (Flink)

        ↓

    Stream Processing Logic

        ↓

    Output/State Store

        ↓

    Serving Layer (API/Dashboard)

    Advantages:

  • Simpler than Lambda
  • Single codebase
  • Easier to maintain
  •  

    Lambda Architecture (Batch + Stream)

    Data Source

        ├→ Batch Layer (Spark)

        │   ↓

        │   Batch View

        │   ↓

        └→ Serving Layer

            ↑

        ├→ Speed Layer (Kafka + Flink)

        │   ↓

        │   Real-time View

        │   ↓

        └→ Serving Layer

    Use case: Combine batch accuracy with stream speed

     

    Stream Processing Paradigms

    1. Processing Windows

    Stream data is divided into windows for processing:

    Tumbling Window (Fixed, Non-overlapping)

    Stream : 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10

    Window : [ 1 , 2 , 3 ] [ 4 , 5 , 6 ] [ 7 , 8 , 9 ]

     

    Result : Sum of each window = 6 , 15 , 24 , 10

    Use case: Calculate hourly metrics

     

    Sliding Window (Overlapping)

    Stream : 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10

    Window Size : 3 , Slide: 1

     

    Result : [ 1 , 2 , 3 ] [ 2 , 3 , 4 ] [ 3 , 4 , 5 ] [ 4 , 5 , 6 ]

            = 6 ,    9 ,     12 ,    15 ,

    Use case: Moving average, trend detection

     

    Session Window (Activity-based)

    User Activity Stream : Click , Click , ( gap ), Click , Click , Click , ( gap )

     

    Result :

    Session 1 : [ Click , Click ]

    Session 2 : [ Click , Click , Click ]

    Use case: User session analysis

     

    Stream Processing with Apache Flink

    Simple Stream Job

    from pyspark .sql import SparkSession

    from pyspark .sql .functions import col , from_json , schema_of_json , window , sum as spark_sum

     

    spark = SparkSession .builder

        .appName ( " StreamProcessing " )

        .getOrCreate ()

     

    # Read from Kafka stream

    df_stream = spark .readStream

        .format ( " kafka " )

        .option ( " kafka.bootstrap .servers " , "localhost:9092" )

        .option ( "subscribe" , "events" )

        .load ()

     

    # Parse JSON

    schema = "name STRING, amount DOUBLE, timestamp TIMESTAMP "

    df_parsed = df_ stream .select (

        from_json ( col ( "value" ).cast ( "string" ), schema ).alias ( "data" )

    ).select ( " data.* " )

     

    # Aggregate with windowing

    df_windowed = df_ parsed .withWatermark ( "timestamp" , "10 seconds" )

        .groupBy (

            window ( col ( "timestamp" ), "1 minute" ),

            col ( "name" )

        ).agg (

            spark_sum ( "amount" ).alias ( " total_amount " )

        )

     

    # Write results to console

    query = df_ windowed .writeStream

        .outputMode ( "update" )

        .format ( "console" )

        .start ()

     

    query .awaitTermination ()

     

    Stream Processing Use Cases

  • Real-time Monitoring and Alerting
  • # Alert if server response time > 1 second

    alerts = df_stream

        .filter ( col ( " response_time " ) > 1000 )

        .groupBy ( " server_id " )

        .count ()

     

    # Send alert when count > 5 in 1 minute

    alerts .filter ( col ( "count" ) > 5 ).select ( " server_id " ).show ()

     

     

    Real-world example: AWS health checks

    2. Fraud Detection

    # Detect unusual transaction patterns

    fraud_detection = df_stream

        .withWatermark ( "timestamp" , "1 minute" )

        .groupBy ( " card_id " )

        .agg (

            count ( " transaction_id " ).alias ( " transaction_count " ),

            sum ( "amount" ).alias ( " total_amount " )

        )

        .filter (

            ( col ( " transaction_count " ) > 5 ) &

            ( col ( " total_amount " ) > 10000 )

        )

     

    # Flag suspicious transactions

    fraud_ detection .select ( " card_id " , " transaction_count " , " total_amount " ).show ()

     

     

    3. Real-time Analytics Dashboard

    Kafka Topic (click events)

        ↓

    Spark Streaming

        ↓

    Aggregate clicks per page per 10 seconds

        ↓

    Update Dashboard in Real-time

        ↓

    Show : 1 , 234 clicks / second on homepage

     

    4. IoT Sensor Data Processing

    # Process sensor readings

    sensor_stream = df_stream

        .filter ( col ( " sensor_id " ).isin ( "TEMP_01" , "TEMP_02" , "TEMP_03" ))

        .withWatermark ( "timestamp" , "5 seconds" )

        .groupBy (

            window ( col ( "timestamp" ), "10 seconds" ),

            " sensor _id "

        ).agg (

            avg ( "temperature" ).alias ( " avg_temp " ),

            max ( "temperature" ).alias ( " max_temp " )

        )

     

    # Alert if temperature exceeds threshold

    sensor_ stream .filter ( col ( " max_temp " ) > 40 ).show ()

     

     

     

    Stream Processing States and Checkpointing

    Stateful Processing

    # Maintain count of events per user

    from pyspark .sql .functions import col

     

    state_df = df_stream

        .groupBy ( " user_id " )

        .count ()  # Maintains state across batches

     

    state_ df .writeStream

        .format ( "console" )

        .outputMode ( "complete" )

        .start ()

     

     

    Checkpointing for Fault Tolerance

    query = df_ stream .writeStream

        .option ( " checkpointLocation " , "/ tmp /checkpoint" )

        .format ( "parquet" )

        .option ( "path" , " hdfs ://path/to/output" )

        .start ()

     

    # If job fails, restarts from checkpoint, no data loss

     

     

    Stream vs Batch: Detailed Comparison

    Aspect

    Stream

    Batch

    Data Model

    Unbounded

    Bounded

    Processing

    Continuous

    Scheduled

    Latency

    <1 second

    Hours

    State

    Stateful

    Stateless

    Ordering

    Time-based windows

    Any order

    Complexity

    Moderate

    Higher

    Cost

    Higher

    Lower

    Correctness

    Exactly-once possible

    Easier

    Use Cases

    Real-time alerts

    Reporting, analytics

     

    Stream Processing Guarantees

    Exactly-Once Semantics

    Ensures each event is processed exactly once, never lost or duplicated.

    # Spark Structured Streaming with checkpointing

    query = df_ stream .writeStream

        .option ( " checkpointLocation " , "/ tmp /checkpoint" )

        .outputMode ( "update" )

        .format ( "parquet" )

        .start ()

     

    # Exactly-once semantics guaranteed with checkpoints

     

     

    Challenges in Stream Processing

  • Late-arriving data: Events arrive out of order
  • Solution: Use watermarks
  • State management: Maintaining aggregations over time
  • Solution: Use RocksDB backend
  • Scaling: Handling increasing data volume
  • Solution: Partition streams by key
  • Exactly-once semantics: Ensuring no duplicates
  • Solution: Idempotent operations + checkpointing
  •  

    Real-World Example: Click Stream Analysis

    spark = SparkSession .builder

        .appName ( " ClickStreamAnalysis " )

        .getOrCreate ()

     

    # Read click events from Kafka

    clicks = spark .readStream

        .format ( " kafka " )

        .option ( " kafka.bootstrap .servers " , "kafka:9092" )

        .option ( "subscribe" , "click-events" )

        .load ()

     

    # Parse click events

    schema = " user_id STRING, page STRING, timestamp TIMESTAMP "

    clicks_parsed = clicks .select (

        from_json ( col ( "value" ).cast ( "string" ), schema ).alias ( "data" )

    ).select ( " data.* " )

     

    # Window analysis: Pages per user per minute

    pages_per_user = clicks_parsed

        .withWatermark ( "timestamp" , "10 seconds" )

        .groupBy (

            window ( "timestamp" , "1 minute" ),

            " user _id "

        ).agg (

            count ( "page" ).alias ( " page_count " ),

            collect_list ( "page" ).alias ( " pages_visited " )

        )

     

    # Write to database for real-time dashboard

    pages_per_ user .writeStream

        .format ( " jdbc " )

        .option ( " url " , " jdbc:mysql ://localhost:3306/analytics " )

        .option ( " dbtable " , " user_clicks " )

        .option ( " checkpointLocation " , "/ tmp /checkpoint" )

        .start ()

     

    # Keep running indefinitely

    spark .streams .awaitAnyTermination ( )

     

     

    Stream Processing Best Practices

  • Set appropriate watermarks: Balance correctness vs latency
  • Handle late data gracefully: Define max lateness
  • Monitor lag: Track processing vs arrival time
  • Use exactly-once processing: Enable checkpointing
  • Partition by key: Distribute load evenly
  • Test with production data: Simulate real volumes
  • Have alerting: Monitor stream health
  • 💡 Key Insights

    •   Always-on requirement: Stream jobs run continuously

    •   Event time vs processing time: Critical distinction

    •   Watermarking essential: Handles late arrivals

    •   State explosion: Unbounded state can consume memory

    •   Exactly-once complexity: Requires careful design

     

  • 📚 Study Notes

    •   Stream processing: Real-time data processing

    •   Unbounded data: Continuous, no end

    •   Windows: Tumbling, sliding, session

    •   Watermarks: Handle late-arriving data

    •   Stateful: Maintain state across batches

    •   Checkpointing: Fault tolerance and recovery

    •   Tools: Spark, Flink, Kafka

     

  • Leave a Reply