Data Ingestion Basics

 

Data ingestion is the critical first step of any data pipeline, transforming raw data from diverse sources into a format ready for processing and analysis—understanding ingestion methods, sources, patterns, and best practices is essential for building reliable data systems that handle continuous data flows reliably—this fundamental topic covers everything from ingestion architecture to real-world patterns.

 

What is Data Ingestion?

Definition

Data ingestion is the process of importing, transferring, loading, and processing data from one or multiple sources to a target system for further processing and analysis.

 

Data Ingestion Pipeline

Data Source → Ingestion Layer → Processing Layer → Storage Layer → Consumption Layer

    ↓              ↓                 ↓                  ↓                ↓

Database      Connector / API     Transform         Data Lake         BI Tools

File          Protocol          Clean             Data Warehouse    Reports

API           Buffer            Validate          Database           Dashboards

Stream        Deduplicate       Enrich            Cache              Models

 

 

 

Data Ingestion Sources

Databases :

from pyspark .sql import SparkSession

 

spark = SparkSession .builder .appName ( " DataIngest " ).getOrCreate ()

 

# MySQL

df_mysql = spark .read .format ( " jdbc " )

    .option ( " url " , " jdbc:mysql ://host:3306/ db " )

    .option ( " dbtable " , "customers" )

    .option ( "user" , "username" )

    .option ( "password" , "password" )

    .load ()

 

# PostgreSQL

df_pg = spark .read .format ( " jdbc " )

    .option ( " url " , " jdbc:postgresql ://host:5432/ db " )

    .option ( " dbtable " , "orders" )

    .load ()

 

# Oracle

df_oracle = spark .read .format ( " jdbc " )

    .option ( " url " , " jdbc:oracle :thin :@ host:1521:db" )

    .option ( " dbtable " , "products" )

    .load ()

 

 

File Systems :

# CSV Files

df_csv = spark .read .csv ( "s3://bucket/data.csv" , header = True , inferSchema = True )

 

# Parquet (Columnar format)

df_parquet = spark .read .parquet ( " hdfs :// namenode / data.parquet " )

 

# JSON

df_json = spark .read .json ( " gs ://bucket/ data.json " )

 

# Avro

df_avro = spark .read .format ( " avro " ).load ( "s3://bucket/ data.avro " )

 

# ORC

df_orc = spark .read .orc ( " hdfs ://data/ table.orc " )

 

 

Message Queues and Streaming :

# Kafka – Real-time stream

df_kafka = spark .readStream

    .format ( " kafka " )

    .option ( " kafka.bootstrap .servers " , "localhost:9092" )

    .option ( "subscribe" , "events" )

    .load ()

 

# Kinesis – AWS streaming

df_kinesis = spark .readStream

    .format ( "kinesis" )

    .option ( " streamName " , "my-stream" )

    .option ( " initialPosition " , "TRIM_HORIZON" )

    .load ()

 

# Pub/Sub – Google Cloud

df_pubsub = spark .readStream

    .format ( " pubsub " )

    .option ( " projectIdOverride " , "project-id" )

    .option ( " pubsubGcpCredential " , "/path/to/credentials" )

    .load ()

 

 

APIs :

import requests

import json

 

# REST API Ingestion

def ingest_from_ api ( url , params ):

    response = requests .get ( url , params = params )

    data = response .json ()

   

    # Convert to DataFrame

    df = spark .createDataFrame ( data )

    return df

 

# Example: GitHub API

github_url = "https://api.github.com/repos/ apache /spark/issues"

params = { "state" : "open" , " per_page " : 100 }

df_issues = ingest_from_ api ( github_url , params )

 

df_ issues .write .parquet ( "s3://bucket/ github_issues /" )

 

 

 

Ingestion Patterns

Batch Ingestion :

# Time-based batch

from datetime import datetime , timedelta

 

def batch_ingestion_ daily ( ):

    spark = SparkSession .builder .appName ( " DailyBatch " ).getOrCreate ()

   

    yesterday = ( datetime .now ( ) timedelta ( days = 1 ) ).strftime ( "%Y-%m- %d " )

   

    # Read yesterday's data

    df = spark .read .jdbc (

        " jdbc:mysql ://host:3306/ db " ,

        "transactions" ,

        { " url " : " jdbc:mysql ://host:3306/ db " ,

         "query" : f "SELECT * FROM transactions WHERE date=' { yesterday } '" }

    )

   

    # Write to data lake

    df .write .parquet ( f "s3:// datalake /transactions/ { yesterday } /" )

 

# Schedule with Airflow

schedule_interval = "0 2 * * * "  # Daily at 2 AM

 

 

Incremental Ingestion :

# Fetch only new/changed records

def incremental_ ingestion ( ):

    spark = SparkSession .builder .appName ( "Incremental" ).getOrCreate ()

   

    # Read last ingestion timestamp

    try :

        last_timestamp = spark .read .text ( "s3://metadata/last_ingestion.txt" )

            .collect ()

    except :

        last_timestamp = "1970-01-01"

   

    # Read only new records

    df_new = spark .read .jdbc (

        " jdbc:mysql ://host:3306/ db " ,

        "events" ,

        { " url " : " jdbc:mysql ://host:3306/ db " ,

         "query" : f "SELECT * FROM events WHERE updated_at > ' { last_timestamp } '" }

    )

   

    # Append to existing data

    df_ new .write .mode ( "append" ).parquet ( "s3:// datalake /events/" )

   

    # Update metadata

    current_timestamp = datetime .now ( ).isoformat ()

    spark .sparkContext .parallelize ([ current_timestamp ])

        .saveAsTextFile ( "s3://metadata/last_ingestion.txt" )

 

 

Streaming Ingestion :

# Continuous ingestion from Kafka

spark = SparkSession .builder .appName ( " StreamingIngest " ).getOrCreate ()

 

df_stream = spark .readStream

    .format ( " kafka " )

    .option ( " kafka.bootstrap .servers " , "kafka:9092" )

    .option ( "subscribe" , "events" )

    .load ()

 

# Parse and write

df_parsed = df_ stream .select (

    from_json ( col ( "value" ).cast ( "string" ), " event_id STRING, amount DOUBLE" )

        .alias ( "data" )

).select ( " data.* " )

 

df_ parsed .writeStream

    .format ( "parquet" )

    .option ( "path" , "s3:// datalake /events/" )

    .option ( " checkpointLocation " , "s3://checkpoint/events/" )

    .start ()

 

 

 

Data Ingestion Best Practices

1. Idempotent Ingestion

# Ensure same result if run multiple times

def idempotent_ingest ( date ):

    spark = SparkSession .builder .getOrCreate ()

   

    df = spark .read .parquet ( f "hdfs ://source/ { date } /" )

   

    # Use overwrite mode (replaces entire partition)

    df .write .mode ( "overwrite" )

        .partitionBy ( "date" )

        .parquet ( f "s3:// datalake / { date } /" )

   

    # Safe to re-run: will produce same result

 

 

2. Data Validation

from pyspark .sql .functions import col , when , count

 

def validate_ingestion ( df ):

    # Check schema

    assert "id" in df .columns , "Missing 'id' column"

    assert "amount" in df .columns , "Missing 'amount' column"

   

    # Check for NULL in critical columns

    null_counts = df .select (

        [ count ( when ( col ( c ).isNull (), c ) ).alias ( c ) for c in [ "id" , "amount" ]]

    )

   

    null_ counts .show ()

   

    # Check value ranges

    invalid_amounts = df .filter ( col ( "amount" ) < 0 )

    assert invalid_ amounts .count () == 0 , "Negative amounts found"

   

    return True

 

 

3. Error Handling

def robust_ingestion ( source_url ):

    try :

        # Attempt ingestion

        df = spark .read .jdbc ( source_url , "table" )

       

        # Validate

        if df .count () == 0 :

            raise ValueError ( "No data returned from source" )

       

        # Process

        df_clean = df .dropna ( subset =[ "id" ])

       

        # Write

        df_ clean .write .mode ( "overwrite" ).parquet ( "s3:// datalake /data/" )

       

        print ( "Ingestion successful" )

       

    except ConnectionError as e :

        print ( f "Connection failed: { e } " )

        # Retry with exponential backoff

        retry_with_ backoff ( )

       

    except ValueError as e :

        print ( f "Data validation failed: { e } " )

        # Alert operations team

        send_ alert ( f "Ingestion validation failed: { e } " )

       

    except Exception as e :

        print ( f "Unexpected error: { e } " )

        # Log and re-raise

        logger .error ( f "Ingestion failed: { e } " , exc_info = True )

        raise

 

 

4. Monitoring and Alerting

def monitored_ ingestion ( ):

    start_time = datetime .now ( )

   

    try :

        # Ingestion

        df = spark .read .parquet ( "s3://source/" )

        rows_ingested = df .count ()

       

        # Calculate metrics

        duration = ( datetime .now ( ) start_time ).total _ seconds ( )

        throughput = rows_ingested / duration

       

        # Log metrics

        print ( f "Rows ingested: { rows_ingested } " )

        print ( f "Duration : { duration } s" )

        print ( f "Throughput : { throughput :.0f } rows/sec" )

       

        # Alert if metrics out of range

        if rows_ingested == 0 :

            send_ alert ( "CRITICAL: No data ingested" )

       

        if throughput < 1000 :

            send_ alert ( f "WARNING : Low throughput: { throughput :.0f } rows/sec" )

       

        return df

       

    except Exception as e :

        send_ alert ( f "CRITICAL : Ingestion failed: { e } " )

        raise

 

 

 

Real-World Data Ingestion Architecture

Multiple Sources

├─ Databases ( MySQL , PostgreSQL )

├─ APIs ( REST , GraphQL )

├─ Files ( CSV , Parquet , JSON )

└─ Streams ( Kafka , Kinesis )

    ↓

Ingestion Layer

├─ Connectors ( JDBC , REST client )

├─ Message queue ( Kafka , RabbitMQ )

├─ Data validation

└─ Error handling

    ↓

Raw Data Lake / Bronze Zone

├─ S3 , HDFS , or cloud storage

├─ Minimal transformations

└─ Full audit trail

    ↓

Metadata Management

├─ Schema registry

├─ Data lineage

└─ Quality metrics

    ↓

Processing Layer

└─ Transform to processed data

    ↓

Data Warehouse

└─ Ready for analytics

 

 

 

Ingestion Performance Optimization

Partitioning

# Ingest data partitioned by date

df .write

    .partitionBy ( "date" )

    .parquet ( "s3:// datalake /data/" )

 

# Enable partition pruning in queries

spark .sql ( "SELECT * FROM data WHERE date='2024-01-01'" )

# Only reads 2024-01-01 partition

 

 

Parallelization

# Increase parallelism during ingestion

df .repartition ( 100 ).write .parquet ( "s3:// datalake /data/" )

 

# Or set config

spark .conf .set ( " spark.sql.shuffle .partitions " , "200" )

 

 

Compression

# Use efficient compression

spark .conf .set ( " spark.hadoop .mapreduce.output .compress " , "true" )

spark .conf .set ( " spark.hadoop .mapreduce.output .compress.codec " ,

               " org.apache.hadoop.io.compress .SnappyCodec " )

 

df .write .parquet ( "s3:// datalake /data/" )

# ~4x compression with minimal CPU overhead

 

 

 

Common Ingestion Challenges

Challenge 1: Schema Evolution

# Handle new columns in source

def handle_schema_ evolution ( ):

    # Option 1: Merge schemas

    spark .conf .set ( " spark.sql.parquet .mergeSchema " , "true" )

   

    # Option 2: Add missing columns

    def add_missing_ columns ( df , required_columns ):

        for col in required_columns :

            if col not in df .columns :

                df = df .withColumn ( col , lit ( None ))

        return df

 

 

Challenge 2: Duplicate Handling

# Remove duplicates during ingestion

df_deduplicated = df .dropDuplicates ([ "id" ]) # Remove full duplicates

df_deduplicated = df .dropDuplicates ([ " order_id " ]) # Keep first occurrence

 

# Keep latest version

from pyspark .sql .window import Window

from pyspark .sql .functions import row_number

 

window = Window .partitionBy ( "id" ).orderBy ( col ( " updated_at " ).desc ())

df_latest = df .withColumn ( " row_num " , row_number ( ).over ( window ))

    .filter ( col ( " row_num " ) == 1 )

    .drop ( " row_num " )

 

 

💡 Key Insights

•   Ingestion is foundational: Quality ingestion enables quality analytics

•   Multiple sources: Different techniques for different sources

•   Idempotency critical: Safe to retry without side effects

•   Validation essential: Catch issues early

•   Monitoring important: Know when things go wrong

 

  • 📚 Study Notes

    •   Data ingestion: Import data from sources to systems

    •   Batch: Scheduled ingestion of fixed datasets

    •   Streaming: Continuous real-time ingestion

    •   Incremental: Only new/changed records

    •   Sources: Databases, files, APIs, streams

    •   Validation: Quality gates on ingestion

    •   Monitoring: Track success and failures

     

  • Leave a Reply