Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
What is a Data Pipeline? Architecture and Core Concepts for Data Engineers

What is a Data Pipeline? Architecture and Core Concepts for Data Engineers

Data Engineering⚡ Practitioner19 min readMar 27, 2026Updated Mar 27, 2026
Table of Contents
  • Prerequisites
  • Understanding Data Pipeline Fundamentals
  • Batch vs Stream Processing Architectures
  • Batch Processing Architecture
  • Stream Processing Architecture
  • Lambda and Kappa Architectures
  • Pipeline Orchestration and Scheduling
  • Data Quality and Monitoring
  • Schema Validation and Evolution
  • Data Profiling and Anomaly Detection
  • Lineage Tracking and Impact Analysis
  • Error Handling and Recovery Patterns
  • Circuit Breaker Pattern

You're staring at a dashboard showing customer churn predictions that are three days out of date. Sales teams are making decisions with stale data, and your machine learning models are training on incomplete datasets. Sound familiar? This is what happens when data moves through your organization like a game of telephone — unreliable, slow, and prone to errors.

Data pipelines solve this problem by creating automated, reliable pathways for data to flow from sources to destinations. But understanding what a pipeline is isn't enough. As a data professional, you need to understand how to architect them properly, recognize the core patterns, and know when different approaches make sense for your specific use cases.

By the end of this lesson, you'll have a comprehensive understanding of data pipeline architecture and be able to design robust pipelines for your own projects.

What you'll learn:

  • The fundamental components that make up any data pipeline architecture
  • How to distinguish between batch, stream, and hybrid processing patterns and when to use each
  • The key architectural decisions that affect pipeline reliability, scalability, and maintainability
  • How to implement monitoring and error handling strategies that prevent data quality issues
  • Real-world patterns for handling common pipeline challenges like schema evolution and backfill operations

Prerequisites

You should be comfortable with basic SQL queries and have some experience with at least one programming language (Python, Scala, or Java). Familiarity with cloud platforms and basic distributed systems concepts will help, but we'll explain the key points as we go.

Understanding Data Pipeline Fundamentals

A data pipeline is an automated sequence of processes that moves data from one or more sources to a destination, transforming it along the way. Think of it as an assembly line for data — raw materials (source data) enter one end, undergo various processing steps, and emerge as finished products (analytics-ready data) at the other end.

But this simple definition masks significant complexity. Real-world pipelines must handle varying data volumes, different formats, network failures, schema changes, and evolving business requirements. The architecture choices you make determine whether your pipeline becomes a reliable workhorse or a maintenance nightmare.

Let's examine the core components that appear in virtually every data pipeline:

Data Sources are where your pipeline begins. These might be transactional databases, APIs, log files, message queues, or external data feeds. Each source type presents unique challenges. Database sources might provide change data capture (CDC) streams, while API sources require rate limiting and authentication handling.

Ingestion Layer handles the initial data collection. This is where you decide between pull-based mechanisms (your pipeline actively retrieves data) and push-based mechanisms (sources send data to your pipeline). The ingestion layer must handle source unavailability, data format variations, and authentication.

Processing Layer transforms the raw data into something useful. This includes cleaning, enriching, aggregating, and restructuring data. Processing can happen in real-time (stream processing) or in batches, and modern pipelines often combine both approaches.

Storage Layer provides intermediate and final data storage. You might use object storage for raw data, data warehouses for analytics, and specialized systems for machine learning features. Storage choices affect query performance, cost, and data governance capabilities.

Orchestration Layer manages the execution of pipeline components, handling scheduling, dependency management, and error recovery. This is often where pipeline complexity becomes most apparent — simple linear workflows are rare in production environments.

Batch vs Stream Processing Architectures

The most fundamental architectural decision in pipeline design is choosing between batch and stream processing patterns. This choice affects every other aspect of your pipeline design.

Batch Processing Architecture

Batch processing moves data in discrete chunks at scheduled intervals. A typical batch pipeline might run every hour, processing all data that arrived since the previous run.

# Typical batch processing logic
def process_hourly_batch(start_time, end_time):
    # Extract data from source
    raw_data = extract_from_source(start_time, end_time)
    
    # Transform the data
    cleaned_data = clean_and_validate(raw_data)
    enriched_data = enrich_with_reference_data(cleaned_data)
    aggregated_data = calculate_metrics(enriched_data)
    
    # Load to destination
    load_to_warehouse(aggregated_data, partition_key=start_time.date())
    
    # Update processing metadata
    update_watermark(end_time)

Batch architectures excel when you can accept latency measured in hours or minutes, need to process large volumes efficiently, or require complex transformations that benefit from seeing complete datasets. They're also simpler to implement and debug because you can easily inspect intermediate results.

Consider a financial reporting pipeline that processes daily transaction data. The business requirement for end-of-day reports makes hourly or daily batch processing perfectly acceptable, and the complex regulatory calculations benefit from having access to complete daily datasets.

Stream Processing Architecture

Stream processing handles data as it arrives, typically with latency measured in seconds or milliseconds. Instead of discrete batches, stream processors work with continuous flows of events.

# Conceptual stream processing logic
class StreamProcessor:
    def __init__(self):
        self.state_store = {}  # Maintains processing state
    
    def process_event(self, event):
        # Validate incoming event
        if not self.is_valid(event):
            self.send_to_dead_letter_queue(event)
            return
        
        # Enrich with current state
        enriched_event = self.enrich_event(event)
        
        # Update state for future events
        self.update_state(event)
        
        # Emit processed event
        self.emit_to_downstream(enriched_event)

Stream processing shines for real-time analytics, fraud detection, or operational monitoring where immediate action is required. However, stream architectures introduce complexity around state management, exactly-once processing guarantees, and handling late-arriving data.

A fraud detection system exemplifies stream processing requirements. Credit card transactions must be evaluated within milliseconds to block suspicious activity, making batch processing unsuitable despite its simplicity.

Lambda and Kappa Architectures

Real-world systems often combine batch and stream processing in hybrid architectures. The Lambda architecture runs both batch and stream processing paths in parallel, merging results at query time. The Kappa architecture attempts to unify both approaches using stream processing for everything.

# Lambda Architecture Components
batch_layer:
  purpose: "Comprehensive, accurate processing of all historical data"
  latency: "Hours to days"
  technology: "Spark, Hadoop MapReduce"
  
speed_layer:
  purpose: "Real-time processing for immediate insights"
  latency: "Seconds to minutes"  
  technology: "Kafka Streams, Apache Flink"
  
serving_layer:
  purpose: "Merge and serve results from both layers"
  technology: "Cassandra, HBase, ElasticSearch"

The Lambda architecture addresses the reality that batch and stream systems have different strengths. Batch systems provide accurate, comprehensive processing but with high latency. Stream systems provide low latency but may sacrifice accuracy during system failures or complex processing requirements.

Pipeline Orchestration and Scheduling

Modern data pipelines involve dozens or hundreds of interdependent tasks. A customer analytics pipeline might depend on data from the CRM system, payment processor, and marketing automation platform, with each source having different update schedules and reliability characteristics.

Orchestration systems manage these complex dependencies while providing observability and error recovery capabilities. Let's examine how this works in practice:

# Apache Airflow DAG example
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.s3_key_sensor import S3KeySensor
from datetime import datetime, timedelta

def extract_customer_data(execution_date, **context):
    # Extract customer data for the given date
    date_str = execution_date.strftime('%Y-%m-%d')
    # Implementation details...
    
def enrich_with_demographic_data(**context):
    # Join with demographic information
    # Implementation details...

dag = DAG(
    'customer_analytics_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    max_active_runs=1,
    catchup=True  # Process historical data
)

# Wait for upstream data availability
wait_for_crm_data = S3KeySensor(
    task_id='wait_for_crm_export',
    bucket_name='customer-data-lake',
    bucket_key='crm/{{ ds }}/export_complete.flag',
    dag=dag
)

# Extract and transform data
extract_customers = PythonOperator(
    task_id='extract_customer_data',
    python_callable=extract_customer_data,
    dag=dag
)

enrich_customers = PythonOperator(
    task_id='enrich_demographic_data', 
    python_callable=enrich_with_demographic_data,
    dag=dag
)

# Define dependencies
wait_for_crm_data >> extract_customers >> enrich_customers

This orchestration approach provides several critical capabilities:

Dependency Management: Tasks only execute when their prerequisites are met. If the CRM export fails, downstream tasks won't process incomplete data.

Retry Logic: Failed tasks can be automatically retried with exponential backoff, handling transient failures without human intervention.

Monitoring and Alerting: The orchestrator tracks task duration, success rates, and data volumes, alerting operators to anomalies.

Backfill Capabilities: When you need to reprocess historical data due to bug fixes or schema changes, the orchestrator can systematically work through date ranges.

Pro Tip: Always design your pipeline tasks to be idempotent — running the same task multiple times with the same inputs should produce identical outputs. This makes error recovery much simpler and enables safe task retries.

Data Quality and Monitoring

Data quality issues are the silent killers of analytics projects. A pipeline that successfully moves garbage data from source to destination is worse than no pipeline at all, because it creates false confidence in corrupted insights.

Effective data quality monitoring happens at multiple levels throughout your pipeline architecture:

Schema Validation and Evolution

Modern pipelines must handle schema changes gracefully. Your customer table might add new columns, change data types, or remove deprecated fields. A robust pipeline anticipates these changes:

class SchemaValidator:
    def __init__(self, expected_schema):
        self.expected_schema = expected_schema
        self.compatibility_rules = {
            'add_column': 'allowed',
            'remove_column': 'requires_approval', 
            'change_type': 'blocked'
        }
    
    def validate_batch(self, data_batch):
        current_schema = self.infer_schema(data_batch)
        schema_diff = self.compare_schemas(self.expected_schema, current_schema)
        
        for change in schema_diff:
            if self.compatibility_rules[change.type] == 'blocked':
                raise SchemaValidationError(f"Incompatible change: {change}")
            elif self.compatibility_rules[change.type] == 'requires_approval':
                self.notify_data_team(change)
                
        return self.apply_schema_migration(data_batch, schema_diff)

Data Profiling and Anomaly Detection

Statistical monitoring catches data quality issues that schema validation misses. You might receive customer records with valid schemas but impossible birth dates or revenue figures:

def monitor_data_quality(dataframe, execution_date):
    quality_metrics = {}
    
    # Volume checks
    record_count = len(dataframe)
    expected_range = get_expected_volume_range(execution_date)
    if not (expected_range[0] <= record_count <= expected_range[1]):
        alert_volume_anomaly(record_count, expected_range)
    
    # Null value monitoring
    null_percentages = dataframe.isnull().mean()
    for column, null_pct in null_percentages.items():
        threshold = get_null_threshold(column)
        if null_pct > threshold:
            alert_data_quality_issue(column, null_pct)
    
    # Domain-specific validations
    if 'customer_age' in dataframe.columns:
        invalid_ages = dataframe[
            (dataframe['customer_age'] < 0) | 
            (dataframe['customer_age'] > 150)
        ]
        if len(invalid_ages) > 0:
            quarantine_invalid_records(invalid_ages)
    
    # Store metrics for trend analysis
    store_quality_metrics(quality_metrics, execution_date)

Lineage Tracking and Impact Analysis

When data quality issues occur, you need to understand their downstream impact quickly. Data lineage tracking maps how data flows through your pipeline ecosystem:

class DataLineageTracker:
    def __init__(self):
        self.lineage_graph = {}
    
    def track_transformation(self, input_datasets, output_dataset, transformation_logic):
        self.lineage_graph[output_dataset] = {
            'inputs': input_datasets,
            'transformation': transformation_logic,
            'timestamp': datetime.now(),
            'job_id': get_current_job_id()
        }
    
    def find_downstream_impact(self, corrupted_dataset):
        """Find all datasets that could be affected by data corruption"""
        affected_datasets = set([corrupted_dataset])
        queue = [corrupted_dataset]
        
        while queue:
            current = queue.pop(0)
            # Find datasets that depend on current dataset
            for dataset, metadata in self.lineage_graph.items():
                if current in metadata['inputs'] and dataset not in affected_datasets:
                    affected_datasets.add(dataset)
                    queue.append(dataset)
        
        return affected_datasets

Error Handling and Recovery Patterns

Production data pipelines fail. Networks become unavailable, source systems go offline, and data formats change unexpectedly. Your pipeline architecture must anticipate these failures and handle them gracefully.

Circuit Breaker Pattern

When upstream systems become unreliable, circuit breakers prevent your pipeline from overwhelming them with requests while they recover:

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call_service(self, service_function, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise CircuitBreakerOpenError("Service unavailable")
        
        try:
            result = service_function(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e
    
    def on_success(self):
        self.failure_count = 0
        self.state = 'CLOSED'
    
    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'

Dead Letter Queues and Poison Pills

Some data records will always be problematic — malformed JSON, invalid encodings, or business rule violations. Dead letter queues isolate these records for manual review without blocking the processing of valid data:

class PipelineProcessor:
    def __init__(self, main_queue, dead_letter_queue):
        self.main_queue = main_queue
        self.dead_letter_queue = dead_letter_queue
        self.max_retries = 3
    
    def process_message(self, message):
        retry_count = 0
        
        while retry_count < self.max_retries:
            try:
                # Attempt to process the message
                validated_data = self.validate_message(message)
                transformed_data = self.transform_message(validated_data)
                self.write_to_destination(transformed_data)
                return
                
            except RetryableError as e:
                retry_count += 1
                time.sleep(2 ** retry_count)  # Exponential backoff
                
            except NonRetryableError as e:
                # Send to dead letter queue for manual investigation
                self.dead_letter_queue.send({
                    'original_message': message,
                    'error': str(e),
                    'timestamp': datetime.now(),
                    'processing_attempts': retry_count + 1
                })
                return
        
        # Exceeded retry limit
        self.dead_letter_queue.send({
            'original_message': message,
            'error': 'Exceeded maximum retry attempts',
            'timestamp': datetime.now(),
            'processing_attempts': retry_count
        })

Checkpoint and Recovery Mechanisms

Stream processing systems need checkpoints to recover from failures without losing data or reprocessing everything:

class StreamProcessor:
    def __init__(self, checkpoint_interval=60):
        self.checkpoint_interval = checkpoint_interval
        self.last_checkpoint = time.time()
        self.processed_offsets = {}
        
    def process_stream(self, stream_source):
        for message in stream_source:
            try:
                # Process the message
                result = self.transform_message(message)
                self.emit_result(result)
                
                # Track progress
                self.processed_offsets[message.partition] = message.offset
                
                # Periodic checkpointing
                if time.time() - self.last_checkpoint > self.checkpoint_interval:
                    self.create_checkpoint()
                    
            except Exception as e:
                self.handle_processing_error(message, e)
    
    def create_checkpoint(self):
        # Persist current processing state
        checkpoint_data = {
            'timestamp': datetime.now(),
            'offsets': self.processed_offsets.copy(),
            'processor_state': self.get_internal_state()
        }
        self.persist_checkpoint(checkpoint_data)
        self.last_checkpoint = time.time()
    
    def recover_from_checkpoint(self, checkpoint_data):
        # Resume processing from last successful checkpoint
        self.processed_offsets = checkpoint_data['offsets']
        self.restore_internal_state(checkpoint_data['processor_state'])

Hands-On Exercise: Building a Customer Analytics Pipeline

Let's build a realistic customer analytics pipeline that demonstrates the architectural concepts we've discussed. This pipeline will process e-commerce data to calculate customer lifetime value and identify at-risk customers.

Our pipeline will handle three data sources:

  • Orders: Transactional data from the e-commerce platform
  • Customer Support: Ticket data from the support system
  • Marketing Events: Email and campaign interaction data

Setting Up the Pipeline Architecture

First, let's define our data models and processing logic:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
import pandas as pd

@dataclass
class Order:
    order_id: str
    customer_id: str
    order_date: datetime
    total_amount: float
    status: str

@dataclass 
class SupportTicket:
    ticket_id: str
    customer_id: str
    created_date: datetime
    category: str
    priority: str
    resolved_date: Optional[datetime]

@dataclass
class MarketingEvent:
    event_id: str
    customer_id: str
    event_type: str  # email_open, email_click, campaign_view
    event_date: datetime
    campaign_id: str

class CustomerAnalyticsPipeline:
    def __init__(self, config):
        self.config = config
        self.quality_checker = DataQualityChecker()
        self.lineage_tracker = DataLineageTracker()
        
    def extract_orders(self, start_date: datetime, end_date: datetime) -> pd.DataFrame:
        """Extract order data with quality validation"""
        # Simulate database extraction
        orders_query = f"""
        SELECT order_id, customer_id, order_date, total_amount, status
        FROM orders 
        WHERE order_date >= '{start_date}' AND order_date < '{end_date}'
        AND status != 'cancelled'
        """
        
        orders_df = self.execute_query(orders_query)
        
        # Quality validation
        self.quality_checker.validate_orders(orders_df)
        
        # Track lineage
        self.lineage_tracker.track_extraction('orders_table', 'raw_orders', orders_query)
        
        return orders_df
    
    def calculate_customer_metrics(self, orders_df: pd.DataFrame) -> pd.DataFrame:
        """Calculate key customer metrics"""
        
        customer_metrics = orders_df.groupby('customer_id').agg({
            'total_amount': ['sum', 'mean', 'count'],
            'order_date': ['min', 'max']
        }).round(2)
        
        # Flatten column names
        customer_metrics.columns = [
            'total_revenue', 'avg_order_value', 'order_count', 
            'first_order_date', 'last_order_date'
        ]
        
        # Calculate additional metrics
        current_date = datetime.now()
        customer_metrics['days_since_last_order'] = (
            current_date - customer_metrics['last_order_date']
        ).dt.days
        
        customer_metrics['customer_lifetime_days'] = (
            customer_metrics['last_order_date'] - customer_metrics['first_order_date']
        ).dt.days + 1
        
        # Calculate CLV (simplified)
        customer_metrics['estimated_clv'] = (
            customer_metrics['avg_order_value'] * 
            customer_metrics['order_count'] * 
            (customer_metrics['customer_lifetime_days'] / 365) * 2  # Projection factor
        ).round(2)
        
        return customer_metrics.reset_index()

Implementing Stream Processing for Real-Time Alerts

Now let's add real-time processing to identify customers who need immediate attention:

import json
from kafka import KafkaConsumer, KafkaProducer

class RealTimeAlertProcessor:
    def __init__(self, kafka_config):
        self.consumer = KafkaConsumer(
            'customer_events',
            bootstrap_servers=kafka_config['servers'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_config['servers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
        self.customer_state = {}  # In-memory state store
        
    def process_support_ticket_event(self, event):
        """Process support ticket events for escalation alerts"""
        customer_id = event['customer_id']
        priority = event['priority']
        
        # Update customer state
        if customer_id not in self.customer_state:
            self.customer_state[customer_id] = {'support_tickets': []}
            
        self.customer_state[customer_id]['support_tickets'].append(event)
        
        # Check for escalation conditions
        recent_tickets = [
            t for t in self.customer_state[customer_id]['support_tickets']
            if (datetime.now() - datetime.fromisoformat(t['created_date'])).days <= 30
        ]
        
        high_priority_tickets = [t for t in recent_tickets if t['priority'] == 'high']
        
        if len(recent_tickets) >= 3 or len(high_priority_tickets) >= 2:
            alert = {
                'alert_type': 'customer_at_risk',
                'customer_id': customer_id,
                'reason': f'{len(recent_tickets)} tickets in 30 days, {len(high_priority_tickets)} high priority',
                'timestamp': datetime.now().isoformat(),
                'recommended_action': 'proactive_outreach'
            }
            
            self.producer.send('customer_alerts', alert)
            
    def run(self):
        """Main processing loop"""
        for message in self.consumer:
            try:
                event = message.value
                
                if event['event_type'] == 'support_ticket_created':
                    self.process_support_ticket_event(event)
                elif event['event_type'] == 'large_refund':
                    self.process_refund_event(event)
                    
            except Exception as e:
                error_event = {
                    'error': str(e),
                    'message': message.value,
                    'timestamp': datetime.now().isoformat()
                }
                self.producer.send('processing_errors', error_event)

Pipeline Orchestration with Airflow

Let's orchestrate our pipeline components using Apache Airflow:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.sensors.s3_key_sensor import S3KeySensor

def run_customer_analytics(**context):
    """Main pipeline execution function"""
    execution_date = context['execution_date']
    
    # Initialize pipeline
    pipeline = CustomerAnalyticsPipeline(config=pipeline_config)
    
    # Extract data for the execution date
    start_date = execution_date
    end_date = execution_date + timedelta(days=1)
    
    orders_df = pipeline.extract_orders(start_date, end_date)
    support_df = pipeline.extract_support_tickets(start_date, end_date)
    marketing_df = pipeline.extract_marketing_events(start_date, end_date)
    
    # Calculate customer metrics
    customer_metrics = pipeline.calculate_customer_metrics(orders_df)
    
    # Enrich with support and marketing data
    enriched_metrics = pipeline.enrich_customer_data(
        customer_metrics, support_df, marketing_df
    )
    
    # Load to data warehouse
    pipeline.load_to_warehouse(enriched_metrics, execution_date)
    
    # Update ML feature store
    pipeline.update_feature_store(enriched_metrics)

# Define the DAG
dag = DAG(
    'customer_analytics_pipeline',
    description='Customer analytics and CLV calculation',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    catchup=True,
    max_active_runs=1,
    tags=['analytics', 'customer', 'clv']
)

# Wait for source data availability
wait_for_orders = S3KeySensor(
    task_id='wait_for_orders_data',
    bucket_name='data-lake-raw',
    bucket_key='orders/{{ ds }}/processing_complete.flag',
    timeout=3600,  # 1 hour timeout
    poke_interval=300,  # Check every 5 minutes
    dag=dag
)

wait_for_support = S3KeySensor(
    task_id='wait_for_support_data', 
    bucket_name='data-lake-raw',
    bucket_key='support_tickets/{{ ds }}/processing_complete.flag',
    timeout=3600,
    poke_interval=300,
    dag=dag
)

# Main processing task
process_analytics = PythonOperator(
    task_id='process_customer_analytics',
    python_callable=run_customer_analytics,
    dag=dag
)

# Data quality validation
validate_output = PythonOperator(
    task_id='validate_output_quality',
    python_callable=validate_analytics_output,
    dag=dag
)

# Notify downstream systems
notify_completion = BashOperator(
    task_id='notify_downstream_systems',
    bash_command='curl -X POST {{ var.value.webhook_url }}/customer_analytics_complete',
    dag=dag
)

# Define task dependencies
[wait_for_orders, wait_for_support] >> process_analytics >> validate_output >> notify_completion

Monitoring and Alerting Implementation

Finally, let's implement comprehensive monitoring:

class PipelineMonitor:
    def __init__(self, metrics_backend, alerting_service):
        self.metrics = metrics_backend
        self.alerts = alerting_service
        
    def monitor_pipeline_execution(self, pipeline_run):
        """Monitor key pipeline metrics"""
        
        # Track processing volume
        self.metrics.gauge('pipeline.records_processed', pipeline_run.record_count)
        self.metrics.gauge('pipeline.processing_time_seconds', pipeline_run.duration)
        
        # Check for anomalies
        if pipeline_run.record_count < pipeline_run.expected_volume * 0.8:
            self.alerts.send_alert({
                'severity': 'warning',
                'title': 'Low data volume detected',
                'message': f'Processed {pipeline_run.record_count} records, expected ~{pipeline_run.expected_volume}',
                'pipeline': pipeline_run.pipeline_name,
                'execution_date': pipeline_run.execution_date
            })
        
        # Monitor data quality metrics
        quality_score = pipeline_run.calculate_quality_score()
        self.metrics.gauge('pipeline.data_quality_score', quality_score)
        
        if quality_score < 0.95:
            self.alerts.send_alert({
                'severity': 'critical',
                'title': 'Data quality issues detected',
                'message': f'Quality score: {quality_score:.2%}. Check data validation logs.',
                'pipeline': pipeline_run.pipeline_name
            })
    
    def track_sla_compliance(self, pipeline_run):
        """Monitor SLA compliance"""
        sla_threshold = pipeline_run.sla_hours * 3600  # Convert to seconds
        
        if pipeline_run.duration > sla_threshold:
            self.alerts.send_alert({
                'severity': 'warning', 
                'title': 'Pipeline SLA breach',
                'message': f'Pipeline took {pipeline_run.duration/3600:.1f} hours, SLA is {pipeline_run.sla_hours} hours',
                'pipeline': pipeline_run.pipeline_name
            })
            
        # Track SLA compliance rate
        self.metrics.increment('pipeline.sla_breaches' if pipeline_run.duration > sla_threshold else 'pipeline.sla_compliance')

This complete example demonstrates how architectural concepts translate into working code. The pipeline handles multiple data sources, provides both batch and stream processing, includes comprehensive error handling, and maintains data quality standards.

Common Mistakes & Troubleshooting

Even experienced data engineers make predictable mistakes when designing pipelines. Understanding these patterns helps you avoid them and debug issues more effectively.

Mistake 1: Ignoring Idempotency

One of the most common pipeline bugs occurs when tasks aren't idempotent. Consider this flawed approach:

# PROBLEMATIC: Not idempotent
def process_daily_sales(execution_date):
    sales_data = extract_sales(execution_date)
    
    # This appends data every time the task runs
    existing_data = read_from_warehouse('daily_sales')
    combined_data = existing_data.append(sales_data)
    write_to_warehouse(combined_data, 'daily_sales')

If this task fails and gets retried, you'll have duplicate data. The fix requires designing for idempotency:

# CORRECT: Idempotent approach
def process_daily_sales(execution_date):
    sales_data = extract_sales(execution_date)
    
    # Use partition-based writes that can be safely overwritten
    partition_key = execution_date.strftime('%Y-%m-%d')
    write_to_warehouse(sales_data, 'daily_sales', partition=partition_key, mode='overwrite')

Mistake 2: Poor Error Handling Granularity

Many pipelines fail completely when they encounter a single bad record. This approach loses all the good data:

# PROBLEMATIC: All-or-nothing processing
def process_customer_batch(customers):
    processed_customers = []
    for customer in customers:
        # If any customer fails, the entire batch fails
        validated_customer = strict_validation(customer)
        enriched_customer = enrich_customer_data(validated_customer)
        processed_customers.append(enriched_customer)
    
    return processed_customers

Better error handling isolates failures:

# BETTER: Isolated error handling
def process_customer_batch(customers):
    processed_customers = []
    failed_customers = []
    
    for customer in customers:
        try:
            validated_customer = strict_validation(customer)
            enriched_customer = enrich_customer_data(validated_customer)
            processed_customers.append(enriched_customer)
        except ValidationError as e:
            failed_customers.append({
                'customer': customer,
                'error': str(e),
                'timestamp': datetime.now()
            })
    
    # Process good data and quarantine bad data
    if processed_customers:
        write_to_warehouse(processed_customers)
    if failed_customers:
        write_to_quarantine(failed_customers)
    
    return len(processed_customers), len(failed_customers)

Mistake 3: Inadequate Monitoring

Many pipelines only monitor for complete failures, missing subtler data quality issues. This monitoring approach is insufficient:

# INSUFFICIENT: Only monitors for crashes
def basic_monitoring(pipeline_result):
    if pipeline_result.success:
        log.info("Pipeline completed successfully")
    else:
        send_alert("Pipeline failed!")

Comprehensive monitoring catches quality issues before they impact users:

# COMPREHENSIVE: Multi-dimensional monitoring
def comprehensive_monitoring(pipeline_result, historical_baselines):
    # Check execution success
    if not pipeline_result.success:
        send_alert("Pipeline execution failed", severity="critical")
        return
    
    # Monitor data volume
    volume_deviation = abs(pipeline_result.record_count - historical_baselines.avg_volume) / historical_baselines.avg_volume
    if volume_deviation > 0.3:
        send_alert(f"Data volume anomaly: {volume_deviation:.1%} deviation", severity="warning")
    
    # Monitor processing time
    if pipeline_result.duration > historical_baselines.avg_duration * 1.5:
        send_alert("Pipeline running slower than expected", severity="warning")
    
    # Monitor data quality metrics
    for metric_name, current_value in pipeline_result.quality_metrics.items():
        baseline_value = historical_baselines.quality_metrics[metric_name]
        if abs(current_value - baseline_value) > baseline_value * 0.1:
            send_alert(f"Data quality metric {metric_name} anomaly", severity="warning")

Troubleshooting Framework

When pipelines fail, use this systematic approach:

  1. Check Dependencies First: Verify that upstream systems are available and producing expected data volumes
  2. Examine Resource Utilization: Look for memory, CPU, or network bottlenecks that might cause timeouts
  3. Validate Data Quality: Compare current batch characteristics to historical patterns
  4. Review Configuration Changes: Recent deployments or configuration updates often cause failures
  5. Analyze Error Patterns: Are failures random or systematic? Random failures suggest infrastructure issues, while systematic failures suggest logic problems

Summary & Next Steps

You now understand the fundamental architecture patterns that underpin all modern data pipelines. The key concepts we've covered — batch vs. stream processing, orchestration strategies, error handling patterns, and monitoring approaches — form the foundation for building reliable, scalable data systems.

The customer analytics pipeline we built demonstrates how these concepts work together in practice. Notice how architectural decisions cascade through the system: choosing batch processing simplified our error handling, while adding stream processing required more sophisticated state management.

As you apply these concepts to your own projects, remember that architecture is about making tradeoffs. Batch systems are simpler but have higher latency. Stream systems provide real-time insights but require more complex error handling. The "right" choice depends on your specific requirements for latency, consistency, and operational complexity.

Your next steps should focus on:

  1. Practice with Real Data: Build a pipeline using data from your organization. Start simple with batch processing, then add complexity as you gain confidence.

  2. Master an Orchestration Tool: Choose either Apache Airflow, Prefect, or a cloud-native solution and build several non-trivial workflows. Understanding dependency management and error recovery is crucial for production systems.

  3. Implement Comprehensive Monitoring: Don't wait until your pipeline is "done" to add monitoring. Build observability into your architecture from day one.

  4. Study Stream Processing: If your use cases require low latency, dive deeper into Apache Kafka, Apache Flink, or cloud streaming solutions. The concepts we covered here provide the foundation, but stream processing has its own complex patterns.

  5. Learn from Failures: When your pipelines fail (and they will), treat each failure as a learning opportunity. The most valuable pipeline engineering skills come from understanding how systems break and how to make them more resilient.

The next lesson in this learning path covers "Data Pipeline Tools and Technologies," where we'll explore specific technologies for implementing the architectural patterns you've learned here. You'll see how tools like Apache Spark, Kafka, and cloud data services implement the concepts we've discussed, and learn how to choose the right technology for your specific requirements.

Learning Path: Data Pipeline Fundamentals

Previous

ETL vs ELT: Choosing the Right Approach

Related Articles

Data Engineering⚡ Practitioner

Introduction to dbt (Data Build Tool)

20 min
Data Engineering🌱 Foundation

ETL vs ELT: Choosing the Right Approach

17 min
Data Engineering⚡ Practitioner

Data Pipeline Orchestration with Airflow

28 min

On this page

  • Prerequisites
  • Understanding Data Pipeline Fundamentals
  • Batch vs Stream Processing Architectures
  • Batch Processing Architecture
  • Stream Processing Architecture
  • Lambda and Kappa Architectures
  • Pipeline Orchestration and Scheduling
  • Data Quality and Monitoring
  • Schema Validation and Evolution
  • Data Profiling and Anomaly Detection
  • Dead Letter Queues and Poison Pills
  • Checkpoint and Recovery Mechanisms
  • Hands-On Exercise: Building a Customer Analytics Pipeline
  • Setting Up the Pipeline Architecture
  • Implementing Stream Processing for Real-Time Alerts
  • Pipeline Orchestration with Airflow
  • Monitoring and Alerting Implementation
  • Common Mistakes & Troubleshooting
  • Mistake 1: Ignoring Idempotency
  • Mistake 2: Poor Error Handling Granularity
  • Mistake 3: Inadequate Monitoring
  • Troubleshooting Framework
  • Summary & Next Steps
  • Lineage Tracking and Impact Analysis
  • Error Handling and Recovery Patterns
  • Circuit Breaker Pattern
  • Dead Letter Queues and Poison Pills
  • Checkpoint and Recovery Mechanisms
  • Hands-On Exercise: Building a Customer Analytics Pipeline
  • Setting Up the Pipeline Architecture
  • Implementing Stream Processing for Real-Time Alerts
  • Pipeline Orchestration with Airflow
  • Monitoring and Alerting Implementation
  • Common Mistakes & Troubleshooting
  • Mistake 1: Ignoring Idempotency
  • Mistake 2: Poor Error Handling Granularity
  • Mistake 3: Inadequate Monitoring
  • Troubleshooting Framework
  • Summary & Next Steps