Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Scheduling and Orchestrating Pipelines with Airflow

Scheduling and Orchestrating Pipelines with Airflow

Data Engineering⚡ Practitioner18 min readApr 1, 2026Updated Apr 1, 2026
Table of Contents
  • Prerequisites
  • Understanding Airflow's Architecture and Core Concepts
  • Building Robust Data Extraction Tasks
  • Implementing Data Validation and Quality Checks
  • Database Operations with Proper Connection Management
  • Advanced Scheduling and Dynamic Task Generation
  • Monitoring, Alerting, and Observability
  • Building the Complete DAG with Dependencies
  • Hands-On Exercise: Building Your Production Pipeline
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps

Scheduling and Orchestrating Pipelines with Airflow

Picture this: It's 3 AM, and your critical data pipeline just failed halfway through processing yesterday's customer transactions. Your revenue reports are wrong, your ML models are stale, and your CEO is asking questions you can't answer. The failure cascaded through six dependent processes, and you're manually restarting each one, hoping you remember the correct order and parameters.

This nightmare scenario is exactly why Apache Airflow exists. Airflow transforms chaotic, brittle data workflows into reliable, observable, and maintainable pipelines. Instead of hoping your cron jobs work and praying dependencies don't break, you'll have a system that handles failures gracefully, provides clear visibility into your data flows, and scales with your growing infrastructure.

In this lesson, we'll build a production-ready data pipeline that ingests customer order data, performs transformations, updates multiple databases, and triggers downstream analytics workflows. You'll learn to handle real-world complexities like external API dependencies, database connections, error handling, and dynamic scheduling.

What you'll learn:

  • Design robust DAGs with proper task dependencies and error handling
  • Implement advanced scheduling patterns including dynamic dates and external triggers
  • Configure connections, variables, and secrets for production environments
  • Monitor pipeline health with alerting, logging, and performance metrics
  • Troubleshoot common failure modes and implement recovery strategies

Prerequisites

You should be comfortable with Python, have basic familiarity with SQL databases, and understand fundamental data pipeline concepts like ETL/ELT workflows. We'll assume you can read Docker configurations and have worked with APIs before.

Understanding Airflow's Architecture and Core Concepts

Before diving into code, let's establish how Airflow thinks about workflows. Unlike simple cron jobs or batch scripts, Airflow models your entire data ecosystem as a collection of Directed Acyclic Graphs (DAGs), where each node represents a task and edges represent dependencies.

The power comes from Airflow's scheduler, which continuously evaluates DAG definitions and determines what should run when. This isn't just about time-based triggers—Airflow considers task dependencies, resource availability, retry policies, and external conditions.

Here's our scenario: We're building a pipeline for an e-commerce company that needs to:

  1. Extract daily order data from a REST API
  2. Clean and validate the data
  3. Load it into a data warehouse
  4. Update customer lifetime value calculations
  5. Trigger ML model retraining if data quality metrics pass
  6. Send summary reports to stakeholders

Let's start with a basic DAG structure:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.http_hook import HttpHook
import pandas as pd
import requests
import json

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'catchup': True
}

dag = DAG(
    'ecommerce_order_pipeline',
    default_args=default_args,
    description='Daily order processing and analytics pipeline',
    schedule_interval='0 2 * * *',  # 2 AM daily
    max_active_runs=1,
    tags=['ecommerce', 'orders', 'analytics']
)

Notice several production-ready patterns here. The depends_on_past=False prevents cascading failures where one day's failure blocks all subsequent runs. The retry configuration handles transient failures automatically. max_active_runs=1 ensures we don't have overlapping executions that could corrupt data.

Building Robust Data Extraction Tasks

Data extraction is where most pipelines fail in production. APIs go down, data formats change, and network issues cause timeouts. Let's build extraction tasks that handle these realities:

def extract_orders_api(**context):
    """Extract daily orders from the e-commerce API with robust error handling."""
    execution_date = context['execution_date']
    date_str = execution_date.strftime('%Y-%m-%d')
    
    http_hook = HttpHook(
        method='GET',
        http_conn_id='ecommerce_api'
    )
    
    # Use execution date for data extraction
    endpoint = f'/api/v1/orders?date={date_str}&limit=10000'
    
    try:
        response = http_hook.run(endpoint)
        
        if response.status_code != 200:
            raise Exception(f"API returned status code {response.status_code}: {response.text}")
        
        orders_data = response.json()
        
        # Validate data structure
        if 'orders' not in orders_data:
            raise Exception("Expected 'orders' key not found in API response")
        
        if len(orders_data['orders']) == 0:
            # This might be normal for some dates, but log it
            print(f"Warning: No orders found for {date_str}")
        
        # Store in XCom for downstream tasks
        return {
            'order_count': len(orders_data['orders']),
            'extraction_date': date_str,
            'orders': orders_data['orders']
        }
        
    except requests.exceptions.RequestException as e:
        raise Exception(f"Network error during API call: {str(e)}")
    except json.JSONDecodeError as e:
        raise Exception(f"Invalid JSON response from API: {str(e)}")

extract_orders = PythonOperator(
    task_id='extract_orders',
    python_callable=extract_orders_api,
    dag=dag
)

This extraction task demonstrates several critical patterns. We use Airflow's execution_date to ensure each DAG run processes the correct day's data, even during backfills. The error handling distinguishes between different failure types—network issues, API errors, and data format problems—which helps with debugging.

Let's add a sensor to ensure the API is healthy before attempting extraction:

# Check API health before extraction
api_health_check = HttpSensor(
    task_id='check_api_health',
    http_conn_id='ecommerce_api',
    endpoint='/health',
    timeout=300,
    poke_interval=60,
    dag=dag
)

Sensors are Airflow's way of waiting for external conditions. This sensor polls the API health endpoint every minute for up to 5 minutes before giving up. This prevents failed extraction attempts when the API is down for maintenance.

Implementing Data Validation and Quality Checks

Raw data extraction is just the beginning. Production pipelines need comprehensive validation to catch data quality issues early:

def validate_orders_data(**context):
    """Validate extracted orders data for completeness and quality."""
    ti = context['task_instance']
    orders_data = ti.xcom_pull(task_ids='extract_orders')
    
    if not orders_data or 'orders' not in orders_data:
        raise Exception("No orders data received from extraction task")
    
    orders = orders_data['orders']
    validation_results = {
        'total_orders': len(orders),
        'validation_errors': [],
        'warnings': []
    }
    
    # Required field validation
    required_fields = ['order_id', 'customer_id', 'total_amount', 'order_date', 'status']
    
    for i, order in enumerate(orders):
        order_errors = []
        
        # Check required fields
        for field in required_fields:
            if field not in order or order[field] is None:
                order_errors.append(f"Missing required field: {field}")
        
        # Validate data types and ranges
        try:
            if 'total_amount' in order:
                amount = float(order['total_amount'])
                if amount < 0:
                    order_errors.append("Negative total_amount")
                elif amount > 50000:  # Suspicious large order
                    validation_results['warnings'].append(f"Large order amount: ${amount} for order {order.get('order_id')}")
        except (ValueError, TypeError):
            order_errors.append("Invalid total_amount format")
        
        # Date validation
        try:
            if 'order_date' in order:
                order_date = datetime.strptime(order['order_date'], '%Y-%m-%d')
                execution_date = context['execution_date'].date()
                if order_date.date() != execution_date:
                    order_errors.append(f"Order date {order_date.date()} doesn't match execution date {execution_date}")
        except ValueError:
            order_errors.append("Invalid order_date format")
        
        if order_errors:
            validation_results['validation_errors'].append({
                'order_index': i,
                'order_id': order.get('order_id', 'unknown'),
                'errors': order_errors
            })
    
    # Determine if validation passed
    error_count = len(validation_results['validation_errors'])
    error_rate = error_count / len(orders) if orders else 0
    
    if error_rate > 0.05:  # More than 5% error rate fails the pipeline
        raise Exception(f"Validation failed: {error_rate:.2%} error rate ({error_count}/{len(orders)} orders)")
    
    # Log warnings but don't fail
    for warning in validation_results['warnings']:
        print(f"WARNING: {warning}")
    
    print(f"Validation passed: {len(orders)} orders validated with {error_count} errors")
    
    # Return cleaned data for downstream tasks
    valid_orders = [
        order for i, order in enumerate(orders)
        if not any(err['order_index'] == i for err in validation_results['validation_errors'])
    ]
    
    return {
        'valid_orders': valid_orders,
        'validation_summary': validation_results
    }

validate_orders = PythonOperator(
    task_id='validate_orders',
    python_callable=validate_orders_data,
    dag=dag
)

This validation task shows how to implement business logic directly in your pipeline. We check for required fields, validate data types, and even implement business rules like reasonable order amounts. The key insight is failing fast—catching data quality issues here prevents them from corrupting downstream systems.

Database Operations with Proper Connection Management

Now let's load our validated data into the warehouse. Production database operations require careful connection management, transaction handling, and performance optimization:

def load_orders_to_warehouse(**context):
    """Load validated orders into the data warehouse with proper transaction handling."""
    ti = context['task_instance']
    validated_data = ti.xcom_pull(task_ids='validate_orders')
    
    if not validated_data or 'valid_orders' not in validated_data:
        raise Exception("No validated orders data available")
    
    orders = validated_data['valid_orders']
    execution_date = context['execution_date'].strftime('%Y-%m-%d')
    
    # Use Airflow's connection management
    postgres_hook = PostgresHook(postgres_conn_id='warehouse_db')
    
    try:
        # Begin transaction
        with postgres_hook.get_conn() as conn:
            with conn.cursor() as cursor:
                # First, delete any existing data for this date (idempotent operation)
                cursor.execute(
                    "DELETE FROM orders WHERE DATE(order_date) = %s",
                    (execution_date,)
                )
                
                deleted_count = cursor.rowcount
                print(f"Deleted {deleted_count} existing orders for {execution_date}")
                
                # Prepare bulk insert
                if orders:
                    # Build INSERT statement with multiple rows
                    insert_sql = """
                    INSERT INTO orders (order_id, customer_id, total_amount, order_date, status, created_at)
                    VALUES %s
                    """
                    
                    # Prepare values for bulk insert
                    values = []
                    for order in orders:
                        values.append((
                            order['order_id'],
                            order['customer_id'],
                            float(order['total_amount']),
                            order['order_date'],
                            order['status'],
                            datetime.utcnow()
                        ))
                    
                    # Use execute_values for efficient bulk insert
                    from psycopg2.extras import execute_values
                    execute_values(
                        cursor,
                        insert_sql,
                        values,
                        template=None,
                        page_size=1000
                    )
                    
                    inserted_count = cursor.rowcount
                    print(f"Inserted {inserted_count} orders for {execution_date}")
                    
                    # Update metadata table
                    cursor.execute("""
                        INSERT INTO pipeline_runs (dag_id, task_id, execution_date, records_processed, status)
                        VALUES (%s, %s, %s, %s, %s)
                        ON CONFLICT (dag_id, task_id, execution_date) 
                        DO UPDATE SET records_processed = EXCLUDED.records_processed, 
                                      status = EXCLUDED.status,
                                      updated_at = CURRENT_TIMESTAMP
                    """, (
                        dag.dag_id,
                        'load_orders_to_warehouse',
                        execution_date,
                        inserted_count,
                        'success'
                    ))
                    
                    # Commit transaction
                    conn.commit()
                    
                    return {
                        'inserted_count': inserted_count,
                        'deleted_count': deleted_count,
                        'execution_date': execution_date
                    }
                else:
                    print(f"No orders to insert for {execution_date}")
                    conn.commit()
                    return {
                        'inserted_count': 0,
                        'deleted_count': deleted_count,
                        'execution_date': execution_date
                    }
                    
    except Exception as e:
        print(f"Database operation failed: {str(e)}")
        # Connection will be automatically rolled back
        raise

load_to_warehouse = PythonOperator(
    task_id='load_to_warehouse',
    python_callable=load_orders_to_warehouse,
    dag=dag
)

This database operation demonstrates several production patterns. The idempotent delete-and-insert approach ensures reruns don't create duplicates. We use bulk operations for performance and maintain metadata about pipeline runs for observability.

Advanced Scheduling and Dynamic Task Generation

Real pipelines often need more sophisticated scheduling than simple cron expressions. Let's implement a branching workflow that conditionally runs different tasks based on data volume:

from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator

def decide_processing_path(**context):
    """Decide whether to use standard or high-volume processing based on order count."""
    ti = context['task_instance']
    load_results = ti.xcom_pull(task_ids='load_to_warehouse')
    
    if not load_results:
        raise Exception("No load results available for decision making")
    
    order_count = load_results['inserted_count']
    
    # High volume processing for > 10000 orders
    if order_count > 10000:
        print(f"High volume detected ({order_count} orders), using distributed processing")
        return 'high_volume_processing'
    else:
        print(f"Standard volume ({order_count} orders), using standard processing")
        return 'standard_processing'

def update_customer_ltv_standard(**context):
    """Standard customer LTV calculation for normal volume days."""
    postgres_hook = PostgresHook(postgres_conn_id='warehouse_db')
    execution_date = context['execution_date'].strftime('%Y-%m-%d')
    
    # Single-threaded LTV calculation
    ltv_sql = """
    WITH customer_orders AS (
        SELECT 
            customer_id,
            SUM(total_amount) as total_spent,
            COUNT(*) as order_count,
            MAX(order_date) as last_order_date,
            MIN(order_date) as first_order_date
        FROM orders 
        WHERE DATE(order_date) <= %s
        GROUP BY customer_id
    ),
    ltv_calculations AS (
        SELECT 
            customer_id,
            total_spent,
            order_count,
            CASE 
                WHEN EXTRACT(days FROM (CURRENT_DATE - first_order_date::date)) > 0 
                THEN total_spent / EXTRACT(days FROM (CURRENT_DATE - first_order_date::date)) * 365
                ELSE total_spent
            END as estimated_annual_value
        FROM customer_orders
    )
    UPDATE customer_metrics cm
    SET 
        lifetime_value = ltv.estimated_annual_value,
        total_orders = ltv.order_count,
        total_spent = ltv.total_spent,
        updated_at = CURRENT_TIMESTAMP
    FROM ltv_calculations ltv
    WHERE cm.customer_id = ltv.customer_id
    """
    
    result = postgres_hook.run(ltv_sql, parameters=(execution_date,))
    print(f"Updated customer LTV calculations for {execution_date}")

def trigger_high_volume_processing(**context):
    """Trigger external high-volume processing system."""
    execution_date = context['execution_date'].strftime('%Y-%m-%d')
    
    # In production, this might trigger a Spark job, Kubernetes job, etc.
    http_hook = HttpHook(method='POST', http_conn_id='processing_cluster')
    
    payload = {
        'job_type': 'customer_ltv_calculation',
        'execution_date': execution_date,
        'priority': 'high',
        'resources': {
            'cpu_cores': 16,
            'memory_gb': 64
        }
    }
    
    response = http_hook.run(
        endpoint='/jobs/submit',
        data=json.dumps(payload),
        headers={'Content-Type': 'application/json'}
    )
    
    if response.status_code not in [200, 201]:
        raise Exception(f"Failed to submit high-volume job: {response.text}")
    
    job_id = response.json().get('job_id')
    print(f"Submitted high-volume processing job: {job_id}")
    return job_id

# Create branching workflow
decide_processing = BranchPythonOperator(
    task_id='decide_processing_path',
    python_callable=decide_processing_path,
    dag=dag
)

standard_processing = PythonOperator(
    task_id='standard_processing',
    python_callable=update_customer_ltv_standard,
    dag=dag
)

high_volume_processing = PythonOperator(
    task_id='high_volume_processing',
    python_callable=trigger_high_volume_processing,
    dag=dag
)

# Convergence point - both paths lead here
processing_complete = DummyOperator(
    task_id='processing_complete',
    trigger_rule='one_success',  # Succeeds if either processing path succeeds
    dag=dag
)

The branching pattern lets your pipeline adapt to different conditions. The trigger_rule='one_success' on the convergence task ensures the pipeline continues regardless of which processing path was taken.

Monitoring, Alerting, and Observability

Production pipelines need comprehensive monitoring beyond Airflow's built-in UI. Let's implement custom metrics and alerting:

def generate_pipeline_metrics(**context):
    """Generate comprehensive metrics about pipeline execution."""
    ti = context['task_instance']
    dag_run = context['dag_run']
    
    # Collect metrics from previous tasks
    load_results = ti.xcom_pull(task_ids='load_to_warehouse')
    validation_results = ti.xcom_pull(task_ids='validate_orders')
    
    execution_date = context['execution_date'].strftime('%Y-%m-%d')
    
    metrics = {
        'pipeline_name': dag.dag_id,
        'execution_date': execution_date,
        'dag_run_id': dag_run.run_id,
        'start_time': dag_run.start_date.isoformat() if dag_run.start_date else None,
        'current_time': datetime.utcnow().isoformat(),
        'orders_processed': load_results.get('inserted_count', 0) if load_results else 0,
        'data_quality_score': 1.0,  # Will calculate based on validation results
        'processing_duration_minutes': 0
    }
    
    # Calculate data quality score
    if validation_results:
        total_orders = validation_results.get('validation_summary', {}).get('total_orders', 0)
        error_count = len(validation_results.get('validation_summary', {}).get('validation_errors', []))
        if total_orders > 0:
            metrics['data_quality_score'] = max(0, 1 - (error_count / total_orders))
    
    # Calculate processing duration
    if dag_run.start_date:
        duration = datetime.utcnow() - dag_run.start_date
        metrics['processing_duration_minutes'] = duration.total_seconds() / 60
    
    # Store metrics in database for historical analysis
    postgres_hook = PostgresHook(postgres_conn_id='warehouse_db')
    
    insert_metrics_sql = """
    INSERT INTO pipeline_metrics 
    (pipeline_name, execution_date, dag_run_id, orders_processed, 
     data_quality_score, processing_duration_minutes, created_at)
    VALUES (%(pipeline_name)s, %(execution_date)s, %(dag_run_id)s, 
            %(orders_processed)s, %(data_quality_score)s, 
            %(processing_duration_minutes)s, CURRENT_TIMESTAMP)
    """
    
    postgres_hook.run(insert_metrics_sql, parameters=metrics)
    
    # Send to external monitoring system (e.g., DataDog, CloudWatch)
    send_metrics_to_monitoring(metrics)
    
    # Check for alerts
    check_and_send_alerts(metrics, context)
    
    return metrics

def send_metrics_to_monitoring(metrics):
    """Send metrics to external monitoring system."""
    # In production, you might use:
    # - DataDog API
    # - CloudWatch metrics
    # - Prometheus pushgateway
    # - Custom metrics endpoint
    
    try:
        monitoring_hook = HttpHook(method='POST', http_conn_id='monitoring_system')
        response = monitoring_hook.run(
            endpoint='/metrics/ingest',
            data=json.dumps(metrics),
            headers={'Content-Type': 'application/json'}
        )
        print(f"Sent metrics to monitoring system: {response.status_code}")
    except Exception as e:
        # Don't fail the pipeline if monitoring is down
        print(f"Warning: Failed to send metrics to monitoring system: {e}")

def check_and_send_alerts(metrics, context):
    """Check metrics against thresholds and send alerts."""
    alerts = []
    
    # Check data quality
    if metrics['data_quality_score'] < 0.95:
        alerts.append({
            'type': 'data_quality',
            'severity': 'warning' if metrics['data_quality_score'] > 0.90 else 'critical',
            'message': f"Data quality score dropped to {metrics['data_quality_score']:.2%}"
        })
    
    # Check processing duration
    if metrics['processing_duration_minutes'] > 120:  # 2 hours
        alerts.append({
            'type': 'performance',
            'severity': 'warning',
            'message': f"Pipeline took {metrics['processing_duration_minutes']:.1f} minutes (>120min threshold)"
        })
    
    # Check order volume
    if metrics['orders_processed'] == 0:
        alerts.append({
            'type': 'data_volume',
            'severity': 'critical',
            'message': "No orders processed - possible data source issue"
        })
    
    # Send alerts
    for alert in alerts:
        send_alert(alert, metrics, context)

def send_alert(alert, metrics, context):
    """Send alert to notification system."""
    alert_payload = {
        'pipeline': metrics['pipeline_name'],
        'execution_date': metrics['execution_date'],
        'alert_type': alert['type'],
        'severity': alert['severity'],
        'message': alert['message'],
        'dag_run_url': f"http://airflow.yourcompany.com/admin/airflow/graph?dag_id={dag.dag_id}&execution_date={context['execution_date']}",
        'metrics': metrics
    }
    
    try:
        # Send to Slack, PagerDuty, email, etc.
        http_hook = HttpHook(method='POST', http_conn_id='alerting_system')
        response = http_hook.run(
            endpoint='/alerts',
            data=json.dumps(alert_payload),
            headers={'Content-Type': 'application/json'}
        )
        print(f"Sent {alert['severity']} alert: {alert['message']}")
    except Exception as e:
        print(f"Failed to send alert: {e}")

generate_metrics = PythonOperator(
    task_id='generate_metrics',
    python_callable=generate_pipeline_metrics,
    dag=dag
)

This monitoring system captures both technical metrics (processing duration, error rates) and business metrics (order volume, data quality). The key is storing historical data for trend analysis and integrating with your broader monitoring infrastructure.

Building the Complete DAG with Dependencies

Now let's wire everything together into a complete DAG with proper dependency management:

# Define all task dependencies
api_health_check >> extract_orders >> validate_orders >> load_to_warehouse
load_to_warehouse >> decide_processing

# Branching paths
decide_processing >> [standard_processing, high_volume_processing]
[standard_processing, high_volume_processing] >> processing_complete

# Final monitoring and cleanup
processing_complete >> generate_metrics

# Optional: Add a final success notification
def send_success_notification(**context):
    """Send notification when entire pipeline completes successfully."""
    metrics = context['task_instance'].xcom_pull(task_ids='generate_metrics')
    
    notification = {
        'status': 'success',
        'pipeline': dag.dag_id,
        'execution_date': context['execution_date'].strftime('%Y-%m-%d'),
        'orders_processed': metrics.get('orders_processed', 0),
        'duration_minutes': metrics.get('processing_duration_minutes', 0),
        'data_quality_score': metrics.get('data_quality_score', 1.0)
    }
    
    # Send to team Slack channel
    send_slack_notification(notification)

success_notification = PythonOperator(
    task_id='success_notification',
    python_callable=send_success_notification,
    dag=dag
)

generate_metrics >> success_notification

Hands-On Exercise: Building Your Production Pipeline

Let's implement a complete pipeline from scratch. You'll build a DAG that processes customer feedback data with the following requirements:

  1. Extract: Pull customer reviews from an API endpoint
  2. Validate: Ensure reviews have required fields and valid sentiment scores
  3. Transform: Calculate aggregated metrics per product
  4. Load: Update both a PostgreSQL database and Redis cache
  5. Monitor: Generate alerts if review volume drops significantly

Here's the foundation to build on:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.hooks.redis_hook import RedisHook
import pandas as pd
import json

# Your task: Define appropriate default_args
default_args = {
    # TODO: Add owner, retries, retry_delay, email settings
}

# Your task: Create the DAG with appropriate scheduling
dag = DAG(
    'customer_feedback_pipeline',
    # TODO: Add default_args, description, schedule, etc.
)

def extract_reviews(**context):
    """
    Extract customer reviews for the execution date.
    
    Your task: Implement robust API extraction with:
    - Proper date handling using execution_date
    - Error handling for API failures
    - Data validation for expected response format
    - Return structured data for downstream tasks
    """
    # TODO: Implement extraction logic
    pass

def validate_reviews(**context):
    """
    Validate extracted reviews data.
    
    Your task: Implement validation that checks:
    - Required fields: review_id, product_id, customer_id, rating, review_text
    - Rating is between 1-5
    - Review text is not empty
    - Product_id follows expected format
    - Fail pipeline if >10% of reviews are invalid
    """
    # TODO: Implement validation logic
    pass

def calculate_product_metrics(**context):
    """
    Calculate aggregated metrics per product.
    
    Your task: Calculate:
    - Average rating per product
    - Review count per product
    - Sentiment distribution (if you want to add sentiment analysis)
    - Flag products with significant rating drops
    """
    # TODO: Implement aggregation logic
    pass

def update_database(**context):
    """
    Update PostgreSQL with new review data and metrics.
    
    Your task: 
    - Use proper transaction management
    - Implement idempotent operations
    - Update both raw reviews and aggregated metrics tables
    """
    # TODO: Implement database operations
    pass

def update_cache(**context):
    """
    Update Redis cache with latest product metrics.
    
    Your task:
    - Update product rating cache
    - Set appropriate TTL values
    - Handle Redis connection failures gracefully
    """
    # TODO: Implement cache operations
    pass

def monitor_and_alert(**context):
    """
    Generate monitoring metrics and alerts.
    
    Your task:
    - Compare today's review volume to historical average
    - Alert if volume drops >50% without explanation
    - Calculate data quality metrics
    - Store metrics for historical analysis
    """
    # TODO: Implement monitoring logic
    pass

# Your task: Create the operators and define dependencies
# Remember to include proper error handling, monitoring, and recovery

Implementation guidelines:

  1. Start simple: Get the basic extraction and loading working first
  2. Add validation: Implement comprehensive data quality checks
  3. Build monitoring: Add metrics collection and alerting
  4. Test failure scenarios: Intentionally break things to test recovery
  5. Optimize for production: Consider performance, idempotency, and observability

Common Mistakes & Troubleshooting

Mistake #1: Not handling execution_date correctly Many developers confuse execution_date with the current time. Remember: execution_date represents the logical date for your data, not when the DAG actually runs.

# Wrong - uses current time
today = datetime.now().strftime('%Y-%m-%d')

# Right - uses execution_date for data consistency
execution_date = context['execution_date'].strftime('%Y-%m-%d')

Mistake #2: Creating non-idempotent operations Your DAG should produce the same result whether it runs once or multiple times. This is crucial for backfills and recovery.

# Wrong - appends data on each run
INSERT INTO orders VALUES (...)

# Right - replaces data for the execution date
DELETE FROM orders WHERE DATE(order_date) = %s;
INSERT INTO orders VALUES (...);

Mistake #3: Ignoring task dependencies Tasks that should run sequentially often get defined to run in parallel, causing race conditions.

# Wrong - tasks run in parallel
task_a = PythonOperator(...)
task_b = PythonOperator(...)  # Depends on task_a output

# Right - explicit dependency
task_a >> task_b

Troubleshooting XCom issues: XCom is great for passing small amounts of data between tasks, but it has limits:

# Wrong - passing large datasets through XCom
return huge_dataframe.to_dict()

# Right - save to shared storage, pass reference
file_path = f'/shared/data/{execution_date}.parquet'
df.to_parquet(file_path)
return {'data_path': file_path, 'row_count': len(df)}

Debugging connection issues: When database or API connections fail:

  1. Test connections in Airflow's Admin > Connections UI
  2. Check network connectivity from Airflow workers
  3. Verify credentials and permissions
  4. Look for connection pooling issues
# Add connection debugging
try:
    hook = PostgresHook(postgres_conn_id='warehouse_db')
    conn = hook.get_conn()
    print(f"Connected to database: {conn.get_dsn_parameters()}")
except Exception as e:
    print(f"Connection failed: {e}")
    # Log additional debugging info
    raise

Memory and performance issues: Large DAGs can consume significant resources:

  • Use max_active_runs=1 to prevent overlapping executions
  • Set appropriate pool resources for database connections
  • Consider using SubDAGs or TaskGroups for complex workflows
  • Monitor task duration and optimize slow operations

Warning: Be careful with catchup=True on DAGs that process large amounts of data. Airflow will try to run all missed executions, which can overwhelm your systems.

Summary & Next Steps

You've built a production-ready data pipeline that handles the complexities real systems face: API failures, data validation, dynamic scheduling, comprehensive monitoring, and graceful error handling. The patterns you've learned—idempotent operations, proper dependency management, and robust error handling—apply to any data pipeline regardless of scale.

Your pipeline now includes:

  • Robust extraction with health checks and retry logic
  • Comprehensive validation that fails fast on bad data
  • Dynamic branching that adapts to different data volumes
  • Production database operations with proper transaction management
  • Monitoring and alerting that provides visibility into pipeline health
  • Error handling that distinguishes between retryable and permanent failures

The key insight is that Airflow isn't just a scheduler—it's a platform for building reliable, observable data systems. The investment in proper error handling, monitoring, and testing pays dividends when your pipeline needs to process critical business data at 3 AM.

Next steps to advance your Airflow skills:

  1. Explore advanced operators: Learn about KubernetesPodOperator for containerized workloads, SparkSubmitOperator for big data processing
  2. Master Airflow configuration: Understand executors (LocalExecutor vs CeleryExecutor vs KubernetesExecutor) and when to use each
  3. Build custom operators: Create reusable operators for common patterns in your organization
  4. Implement advanced scheduling: Learn about dynamic DAG generation, external triggers, and SLAs
  5. Study Airflow internals: Understand the scheduler, metadata database, and worker processes for optimization

Your next challenge: Build a multi-DAG system where one DAG's completion triggers another, implementing cross-DAG dependencies and shared resource management. This mirrors real-world scenarios where different teams' pipelines need to coordinate while maintaining independence.

Learning Path: Data Pipeline Fundamentals

Previous

Data Quality: Validation, Testing, and Monitoring Pipelines

Related Articles

Data Engineering🌱 Foundation

Data Quality: Validation, Testing, and Monitoring Pipelines

22 min
Data Engineering🔥 Expert

Building Your First Data Pipeline with Python

25 min
Data Engineering⚡ Practitioner

What is a Data Pipeline? Architecture and Core Concepts for Data Engineers

19 min

On this page

  • Prerequisites
  • Understanding Airflow's Architecture and Core Concepts
  • Building Robust Data Extraction Tasks
  • Implementing Data Validation and Quality Checks
  • Database Operations with Proper Connection Management
  • Advanced Scheduling and Dynamic Task Generation
  • Monitoring, Alerting, and Observability
  • Building the Complete DAG with Dependencies
  • Hands-On Exercise: Building Your Production Pipeline
Common Mistakes & Troubleshooting
  • Summary & Next Steps