Shrikant Paliwal

The Evolution of ETL: From Batch Jobs to Real-Time Streaming

2024-03-28By Shrikant Paliwal12 min read
The Evolution of ETL: From Batch Jobs to Real-Time Streaming

The Evolution of ETL: From Batch Jobs to Real-Time Streaming

The landscape of data processing has undergone a dramatic transformation over the past decade. What started as simple nightly batch jobs has evolved into sophisticated real-time streaming pipelines. Let's explore this evolution and understand how modern ETL architectures are reshaping data engineering.

The Journey from Batch to Streaming

Traditional Batch ETL

Traditional ETL processes typically looked like this:

# Traditional batch ETL example
def process_daily_sales():
    # Extract: Read yesterday's sales data
    with open('sales_20240327.csv', 'r') as file:
        sales_data = pd.read_csv(file)
    
    # Transform: Calculate daily metrics
    daily_metrics = sales_data.groupby('product_id').agg({
        'quantity': 'sum',
        'revenue': 'sum'
    })
    
    # Load: Write to data warehouse
    daily_metrics.to_sql(
        'daily_sales_metrics',
        warehouse_engine,
        if_exists='append'
    )

# Run every night at 2 AM
schedule.every().day.at("02:00").do(process_daily_sales)

Modern Streaming ETL

Today's real-time ETL looks quite different:

from confluent_kafka import Consumer, Producer
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

def process_sales_stream():
    # Initialize Spark Streaming session
    spark = SparkSession.builder \
        .appName("RealTimeSalesMetrics") \
        .getOrCreate()
    
    # Create streaming DataFrame from Kafka
    sales_stream = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "sales_events") \
        .load()
    
    # Parse JSON and calculate metrics in real-time
    sales_metrics = sales_stream \
        .select(from_json(col("value").cast("string"), sales_schema).alias("data")) \
        .select("data.*") \
        .groupBy(
            window(col("timestamp"), "5 minutes"),
            col("product_id")
        ) \
        .agg(
            sum("quantity").alias("total_quantity"),
            sum("revenue").alias("total_revenue")
        )
    
    # Write results to both streaming and batch sinks
    query = sales_metrics \
        .writeStream \
        .outputMode("append") \
        .format("delta") \
        .option("checkpointLocation", "/tmp/checkpoint/sales") \
        .start("/data/sales_metrics")
    
    # Also maintain a real-time cache
    redis_query = sales_metrics \
        .writeStream \
        .foreachBatch(update_redis_cache) \
        .start()
    
    spark.streams.awaitAnyTermination()

def update_redis_cache(batch_df, batch_id):
    # Update Redis cache with latest metrics
    for row in batch_df.collect():
        redis_client.hset(
            f"product:{row.product_id}",
            mapping={
                "quantity": row.total_quantity,
                "revenue": row.total_revenue,
                "last_updated": row.window.end.isoformat()
            }
        )

Key Components of Modern ETL

1. Stream Processing with Apache Kafka

Kafka serves as the backbone of modern streaming ETL:

// Kafka Producer for real-time event ingestion
import { Kafka } from 'kafkajs';

class SalesEventProducer {
    private producer: any;

    constructor() {
        const kafka = new Kafka({
            clientId: 'sales-producer',
            brokers: ['localhost:9092']
        });
        this.producer = kafka.producer();
    }

    async publishSaleEvent(sale: Sale) {
        await this.producer.connect();
        
        try {
            await this.producer.send({
                topic: 'sales_events',
                messages: [{
                    key: sale.id,
                    value: JSON.stringify({
                        product_id: sale.productId,
                        quantity: sale.quantity,
                        revenue: sale.revenue,
                        timestamp: new Date().toISOString()
                    }),
                    headers: {
                        'source': 'pos-system',
                        'version': '1.0'
                    }
                }]
            });
        } finally {
            await this.producer.disconnect();
        }
    }
}

2. Real-Time Processing with Apache Spark Streaming

Spark Streaming enables complex analytics on streaming data:

// Spark Structured Streaming example
import org.apache.spark.sql.functions._

val salesStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "sales_events")
  .load()

// Calculate real-time metrics with watermarking
val realtimeMetrics = salesStream
  .select(from_json($"value".cast("string"), salesSchema).as("data"))
  .select("data.*")
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
    window($"timestamp", "5 minutes"),
    $"product_id"
  )
  .agg(
    sum("quantity").as("total_quantity"),
    sum("revenue").as("total_revenue"),
    approx_count_distinct("customer_id").as("unique_customers")
  )

// Output to multiple sinks
val query = realtimeMetrics.writeStream
  .outputMode("append")
  .format("delta")
  .option("checkpointLocation", "/tmp/checkpoint/sales")
  .start()

3. Hybrid Approaches: Combining Batch and Streaming

Modern systems often combine both approaches:

from datetime import datetime, timedelta
from pyspark.sql import SparkSession

class HybridETLPipeline:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("HybridETL") \
            .config("spark.sql.streaming.schemaInference", "true") \
            .getOrCreate()
    
    def process_streaming_data(self):
        """Handle real-time updates"""
        return self.spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "sales_events") \
            .load()
    
    def process_batch_data(self):
        """Handle historical data processing"""
        return self.spark.read \
            .format("delta") \
            .load("/data/historical_sales")
    
    def run_hybrid_pipeline(self):
        # Process streaming data
        streaming_df = self.process_streaming_data()
        
        # Process batch data
        batch_df = self.process_batch_data()
        
        # Combine results
        combined_results = streaming_df.union(batch_df)
        
        # Write to Delta Lake for ACID compliance
        combined_results.write \
            .format("delta") \
            .mode("overwrite") \
            .save("/data/combined_metrics")

Architectural Patterns

Lambda Architecture

The Lambda architecture combines batch and streaming processing:

Kappa Architecture

The Kappa architecture treats everything as a stream:

Best Practices for Modern ETL

  1. Design for Scalability

    • Use partitioning effectively
    • Implement backpressure handling
    • Plan for data growth
  2. Ensure Data Quality

    • Implement schema validation
    • Monitor data freshness
    • Handle late-arriving data
  3. Maintain Observability

    • Track processing latency
    • Monitor throughput
    • Set up alerting
  4. Handle Failure Scenarios

    • Implement retry mechanisms
    • Maintain dead letter queues
    • Plan for disaster recovery

Performance Optimization Tips

# Example: Optimized Spark Streaming configuration
spark = SparkSession.builder \
    .appName("OptimizedETL") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.default.parallelism", "100") \
    .config("spark.streaming.backpressure.enabled", "true") \
    .config("spark.streaming.kafka.maxRatePerPartition", "100") \
    .getOrCreate()

# Use appropriate watermarking
streamingDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "events") \
    .load() \
    .withWatermark("timestamp", "2 hours") \
    .groupBy(window("timestamp", "1 hour")) \
    .count()

Conclusion

The evolution from batch to streaming ETL represents a fundamental shift in how we process data. While batch processing still has its place, real-time streaming is becoming increasingly important for modern data-driven applications.

Key takeaways:

  • Choose the right architecture based on your needs
  • Consider hybrid approaches when appropriate
  • Focus on scalability and reliability
  • Maintain proper monitoring and observability
  • Plan for failure scenarios

The future of ETL is moving towards more real-time, more automated, and more intelligent processing. Stay ahead by embracing modern streaming architectures while maintaining the reliability of traditional batch processing where it makes sense.

About the Author

Shrikant Paliwal

Shrikant Paliwal

Full-Stack Software Engineer specializing in cloud-native technologies and distributed systems.