Data Ingestion Basics
Data ingestion is the critical first step of any data pipeline, transforming raw data from diverse sources into a format ready for processing and analysis—understanding ingestion methods, sources, patterns, and best practices is essential for building reliable data systems that handle continuous data flows reliably—this fundamental topic covers everything from ingestion architecture to real-world patterns.
What is Data Ingestion?
Definition
Data ingestion is the process of importing, transferring, loading, and processing data from one or multiple sources to a target system for further processing and analysis.
Data Ingestion Pipeline
Data Source → Ingestion Layer → Processing Layer → Storage Layer → Consumption Layer
↓ ↓ ↓ ↓ ↓
Database Connector / API Transform Data Lake BI Tools
File Protocol Clean Data Warehouse Reports
API Buffer Validate Database Dashboards
Stream Deduplicate Enrich Cache Models
Data Ingestion Sources
Databases :
from pyspark .sql import SparkSession
spark = SparkSession .builder .appName ( " DataIngest " ).getOrCreate ()
# MySQL
df_mysql = spark .read .format ( " jdbc " )
.option ( " url " , " jdbc:mysql ://host:3306/ db " )
.option ( " dbtable " , "customers" )
.option ( "user" , "username" )
.option ( "password" , "password" )
.load ()
# PostgreSQL
df_pg = spark .read .format ( " jdbc " )
.option ( " url " , " jdbc:postgresql ://host:5432/ db " )
.option ( " dbtable " , "orders" )
.load ()
# Oracle
df_oracle = spark .read .format ( " jdbc " )
.option ( " url " , " jdbc:oracle :thin :@ host:1521:db" )
.option ( " dbtable " , "products" )
.load ()
File Systems :
# CSV Files
df_csv = spark .read .csv ( "s3://bucket/data.csv" , header = True , inferSchema = True )
# Parquet (Columnar format)
df_parquet = spark .read .parquet ( " hdfs :// namenode / data.parquet " )
# JSON
df_json = spark .read .json ( " gs ://bucket/ data.json " )
# Avro
df_avro = spark .read .format ( " avro " ).load ( "s3://bucket/ data.avro " )
# ORC
df_orc = spark .read .orc ( " hdfs ://data/ table.orc " )
Message Queues and Streaming :
# Kafka – Real-time stream
df_kafka = spark .readStream
.format ( " kafka " )
.option ( " kafka.bootstrap .servers " , "localhost:9092" )
.option ( "subscribe" , "events" )
.load ()
# Kinesis – AWS streaming
df_kinesis = spark .readStream
.format ( "kinesis" )
.option ( " streamName " , "my-stream" )
.option ( " initialPosition " , "TRIM_HORIZON" )
.load ()
# Pub/Sub – Google Cloud
df_pubsub = spark .readStream
.format ( " pubsub " )
.option ( " projectIdOverride " , "project-id" )
.option ( " pubsubGcpCredential " , "/path/to/credentials" )
.load ()
APIs :
import requests
import json
# REST API Ingestion
def ingest_from_ api ( url , params ):
response = requests .get ( url , params = params )
data = response .json ()
# Convert to DataFrame
df = spark .createDataFrame ( data )
return df
# Example: GitHub API
github_url = "https://api.github.com/repos/ apache /spark/issues"
params = { "state" : "open" , " per_page " : 100 }
df_issues = ingest_from_ api ( github_url , params )
df_ issues .write .parquet ( "s3://bucket/ github_issues /" )
Ingestion Patterns
Batch Ingestion :
# Time-based batch
from datetime import datetime , timedelta
def batch_ingestion_ daily ( ):
spark = SparkSession .builder .appName ( " DailyBatch " ).getOrCreate ()
yesterday = ( datetime .now ( ) – timedelta ( days = 1 ) ).strftime ( "%Y-%m- %d " )
# Read yesterday's data
df = spark .read .jdbc (
" jdbc:mysql ://host:3306/ db " ,
"transactions" ,
{ " url " : " jdbc:mysql ://host:3306/ db " ,
"query" : f "SELECT * FROM transactions WHERE date=' { yesterday } '" }
)
# Write to data lake
df .write .parquet ( f "s3:// datalake /transactions/ { yesterday } /" )
# Schedule with Airflow
schedule_interval = "0 2 * * * " # Daily at 2 AM
Incremental Ingestion :
# Fetch only new/changed records
def incremental_ ingestion ( ):
spark = SparkSession .builder .appName ( "Incremental" ).getOrCreate ()
# Read last ingestion timestamp
try :
last_timestamp = spark .read .text ( "s3://metadata/last_ingestion.txt" )
.collect ()
except :
last_timestamp = "1970-01-01"
# Read only new records
df_new = spark .read .jdbc (
" jdbc:mysql ://host:3306/ db " ,
"events" ,
{ " url " : " jdbc:mysql ://host:3306/ db " ,
"query" : f "SELECT * FROM events WHERE updated_at > ' { last_timestamp } '" }
)
# Append to existing data
df_ new .write .mode ( "append" ).parquet ( "s3:// datalake /events/" )
# Update metadata
current_timestamp = datetime .now ( ).isoformat ()
spark .sparkContext .parallelize ([ current_timestamp ])
.saveAsTextFile ( "s3://metadata/last_ingestion.txt" )
Streaming Ingestion :
# Continuous ingestion from Kafka
spark = SparkSession .builder .appName ( " StreamingIngest " ).getOrCreate ()
df_stream = spark .readStream
.format ( " kafka " )
.option ( " kafka.bootstrap .servers " , "kafka:9092" )
.option ( "subscribe" , "events" )
.load ()
# Parse and write
df_parsed = df_ stream .select (
from_json ( col ( "value" ).cast ( "string" ), " event_id STRING, amount DOUBLE" )
.alias ( "data" )
).select ( " data.* " )
df_ parsed .writeStream
.format ( "parquet" )
.option ( "path" , "s3:// datalake /events/" )
.option ( " checkpointLocation " , "s3://checkpoint/events/" )
.start ()
Data Ingestion Best Practices
1. Idempotent Ingestion
# Ensure same result if run multiple times
def idempotent_ingest ( date ):
spark = SparkSession .builder .getOrCreate ()
df = spark .read .parquet ( f "hdfs ://source/ { date } /" )
# Use overwrite mode (replaces entire partition)
df .write .mode ( "overwrite" )
.partitionBy ( "date" )
.parquet ( f "s3:// datalake / { date } /" )
# Safe to re-run: will produce same result
2. Data Validation
from pyspark .sql .functions import col , when , count
def validate_ingestion ( df ):
# Check schema
assert "id" in df .columns , "Missing 'id' column"
assert "amount" in df .columns , "Missing 'amount' column"
# Check for NULL in critical columns
null_counts = df .select (
[ count ( when ( col ( c ).isNull (), c ) ).alias ( c ) for c in [ "id" , "amount" ]]
)
null_ counts .show ()
# Check value ranges
invalid_amounts = df .filter ( col ( "amount" ) < 0 )
assert invalid_ amounts .count () == 0 , "Negative amounts found"
return True
3. Error Handling
def robust_ingestion ( source_url ):
try :
# Attempt ingestion
df = spark .read .jdbc ( source_url , "table" )
# Validate
if df .count () == 0 :
raise ValueError ( "No data returned from source" )
# Process
df_clean = df .dropna ( subset =[ "id" ])
# Write
df_ clean .write .mode ( "overwrite" ).parquet ( "s3:// datalake /data/" )
print ( "Ingestion successful" )
except ConnectionError as e :
print ( f "Connection failed: { e } " )
# Retry with exponential backoff
retry_with_ backoff ( )
except ValueError as e :
print ( f "Data validation failed: { e } " )
# Alert operations team
send_ alert ( f "Ingestion validation failed: { e } " )
except Exception as e :
print ( f "Unexpected error: { e } " )
# Log and re-raise
logger .error ( f "Ingestion failed: { e } " , exc_info = True )
raise
4. Monitoring and Alerting
def monitored_ ingestion ( ):
start_time = datetime .now ( )
try :
# Ingestion
df = spark .read .parquet ( "s3://source/" )
rows_ingested = df .count ()
# Calculate metrics
duration = ( datetime .now ( ) – start_time ).total _ seconds ( )
throughput = rows_ingested / duration
# Log metrics
print ( f "Rows ingested: { rows_ingested } " )
print ( f "Duration : { duration } s" )
print ( f "Throughput : { throughput :.0f } rows/sec" )
# Alert if metrics out of range
if rows_ingested == 0 :
send_ alert ( "CRITICAL: No data ingested" )
if throughput < 1000 :
send_ alert ( f "WARNING : Low throughput: { throughput :.0f } rows/sec" )
return df
except Exception as e :
send_ alert ( f "CRITICAL : Ingestion failed: { e } " )
raise
Real-World Data Ingestion Architecture
Multiple Sources
├─ Databases ( MySQL , PostgreSQL )
├─ APIs ( REST , GraphQL )
├─ Files ( CSV , Parquet , JSON )
└─ Streams ( Kafka , Kinesis )
↓
Ingestion Layer
├─ Connectors ( JDBC , REST client )
├─ Message queue ( Kafka , RabbitMQ )
├─ Data validation
└─ Error handling
↓
Raw Data Lake / Bronze Zone
├─ S3 , HDFS , or cloud storage
├─ Minimal transformations
└─ Full audit trail
↓
Metadata Management
├─ Schema registry
├─ Data lineage
└─ Quality metrics
↓
Processing Layer
└─ Transform to processed data
↓
Data Warehouse
└─ Ready for analytics
Ingestion Performance Optimization
Partitioning
# Ingest data partitioned by date
df .write
.partitionBy ( "date" )
.parquet ( "s3:// datalake /data/" )
# Enable partition pruning in queries
spark .sql ( "SELECT * FROM data WHERE date='2024-01-01'" )
# Only reads 2024-01-01 partition
Parallelization
# Increase parallelism during ingestion
df .repartition ( 100 ).write .parquet ( "s3:// datalake /data/" )
# Or set config
spark .conf .set ( " spark.sql.shuffle .partitions " , "200" )
Compression
# Use efficient compression
spark .conf .set ( " spark.hadoop .mapreduce.output .compress " , "true" )
spark .conf .set ( " spark.hadoop .mapreduce.output .compress.codec " ,
" org.apache.hadoop.io.compress .SnappyCodec " )
df .write .parquet ( "s3:// datalake /data/" )
# ~4x compression with minimal CPU overhead
Common Ingestion Challenges
Challenge 1: Schema Evolution
# Handle new columns in source
def handle_schema_ evolution ( ):
# Option 1: Merge schemas
spark .conf .set ( " spark.sql.parquet .mergeSchema " , "true" )
# Option 2: Add missing columns
def add_missing_ columns ( df , required_columns ):
for col in required_columns :
if col not in df .columns :
df = df .withColumn ( col , lit ( None ))
return df
Challenge 2: Duplicate Handling
# Remove duplicates during ingestion
df_deduplicated = df .dropDuplicates ([ "id" ]) # Remove full duplicates
df_deduplicated = df .dropDuplicates ([ " order_id " ]) # Keep first occurrence
# Keep latest version
from pyspark .sql .window import Window
from pyspark .sql .functions import row_number
window = Window .partitionBy ( "id" ).orderBy ( col ( " updated_at " ).desc ())
df_latest = df .withColumn ( " row_num " , row_number ( ).over ( window ))
.filter ( col ( " row_num " ) == 1 )
.drop ( " row_num " )
💡 Key Insights
• Ingestion is foundational: Quality ingestion enables quality analytics
• Multiple sources: Different techniques for different sources
• Idempotency critical: Safe to retry without side effects
• Validation essential: Catch issues early
• Monitoring important: Know when things go wrong
📚 Study Notes
• Data ingestion: Import data from sources to systems
• Batch: Scheduled ingestion of fixed datasets
• Streaming: Continuous real-time ingestion
• Incremental: Only new/changed records
• Sources: Databases, files, APIs, streams
• Validation: Quality gates on ingestion
• Monitoring: Track success and failures