
You're staring at a collection of Python scripts that extract data from your company's CRM, transform customer records, and load them into your analytics warehouse. Each script runs on a different schedule via cron jobs scattered across multiple servers. When the CRM API goes down, your downstream reports break silently. When the transformation script fails halfway through processing 100,000 records, you have to manually figure out where to restart. Sound familiar?
This chaotic approach to data pipeline management is exactly what Apache Airflow was designed to solve. Instead of managing dozens of disconnected scripts, Airflow lets you orchestrate complex data workflows as code, with built-in monitoring, retry logic, and dependency management.
By the end of this lesson, you'll be building robust, production-ready data pipelines that can handle the real-world complexities of modern data infrastructure.
What you'll learn: • Design and implement multi-step data pipelines using Airflow's Directed Acyclic Graph (DAG) architecture • Configure task dependencies, scheduling, and retry policies for reliable pipeline execution • Handle common pipeline failure scenarios with proper error handling and alerting • Monitor and troubleshoot pipeline performance using Airflow's web interface and logging • Implement data quality checks and validation steps within your orchestration workflow
Before diving in, you should have:
If you need a refresher on data pipeline fundamentals, check out our Data Pipeline Architecture Basics lesson first.
Apache Airflow thinks about data pipelines differently than traditional cron-based approaches. Instead of scheduling individual scripts, you define workflows as Directed Acyclic Graphs (DAGs) where each node represents a task and edges represent dependencies.
Think of a DAG like a recipe with dependencies. You can't frost a cake before baking it, and you can't bake it before mixing the batter. Similarly, in a data pipeline, you might need to:
Here's how that looks as an Airflow DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Define default arguments that apply to all tasks
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email': ['data-team@yourcompany.com']
}
# Define the DAG
dag = DAG(
'customer_analytics_pipeline',
default_args=default_args,
description='Process customer data for analytics dashboard',
schedule_interval='0 2 * * *', # Run daily at 2 AM
catchup=False,
tags=['analytics', 'customer_data']
)
# Define tasks
extract_crm_data = PythonOperator(
task_id='extract_crm_data',
python_callable=extract_customer_data,
dag=dag
)
extract_orders_data = PythonOperator(
task_id='extract_orders_data',
python_callable=extract_order_data,
dag=dag
)
join_customer_orders = PythonOperator(
task_id='join_customer_orders',
python_callable=merge_customer_order_data,
dag=dag
)
calculate_clv = PythonOperator(
task_id='calculate_customer_lifetime_value',
python_callable=calculate_lifetime_value,
dag=dag
)
update_dashboard = BashOperator(
task_id='refresh_analytics_dashboard',
bash_command='curl -X POST "https://dashboard.yourcompany.com/api/refresh"',
dag=dag
)
# Define dependencies
[extract_crm_data, extract_orders_data] >> join_customer_orders >> calculate_clv >> update_dashboard
This DAG definition tells Airflow:
Let's get Airflow running so you can follow along with the examples. We'll use Docker Compose for a quick setup that includes the web server, scheduler, and metadata database.
Create a new directory for your Airflow project and save this docker-compose.yml:
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres_db_volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
airflow-webserver:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
image: apache/airflow:2.7.0
depends_on:
- postgres
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
command: scheduler
restart: always
volumes:
postgres_db_volume:
Create the necessary directories and start Airflow:
mkdir -p dags logs plugins
docker-compose up -d
After a few minutes, visit http://localhost:8080 in your browser. The default username and password are both airflow.
Pro tip: The
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'setting prevents Airflow from loading dozens of example DAGs that can clutter your interface while learning. You can change this to'true'later to explore additional examples.
Now let's build a realistic data pipeline that demonstrates Airflow's key features. We'll create a pipeline that processes e-commerce data: extracting sales data, cleaning it, calculating metrics, and storing results.
Create a new file dags/ecommerce_analytics.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.sensors.filesystem import FileSensor
import pandas as pd
import sqlite3
import os
from pathlib import Path
default_args = {
'owner': 'analytics_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=10),
'execution_timeout': timedelta(hours=2)
}
dag = DAG(
'ecommerce_daily_analytics',
default_args=default_args,
description='Daily e-commerce analytics processing',
schedule_interval='0 3 * * *', # 3 AM daily
max_active_runs=1, # Prevent overlapping runs
catchup=False,
tags=['ecommerce', 'analytics', 'daily']
)
def extract_sales_data(**context):
"""Extract sales data from multiple sources"""
execution_date = context['ds'] # YYYY-MM-DD format
# Simulate extracting from multiple data sources
print(f"Extracting sales data for {execution_date}")
# In reality, this might hit APIs, databases, or file systems
sales_data = {
'order_id': range(1000, 1500),
'customer_id': [f'CUST_{i%100:03d}' for i in range(500)],
'product_category': ['Electronics', 'Clothing', 'Books', 'Home'] * 125,
'order_amount': [round(50 + (i * 13.7) % 200, 2) for i in range(500)],
'order_date': [execution_date] * 500,
'discount_applied': [round((i * 7) % 50, 2) for i in range(500)]
}
df = pd.DataFrame(sales_data)
# Save to temporary location for next task
output_path = f'/tmp/raw_sales_{execution_date}.csv'
df.to_csv(output_path, index=False)
print(f"Extracted {len(df)} sales records to {output_path}")
return output_path
def validate_data_quality(**context):
"""Validate the extracted data meets quality requirements"""
execution_date = context['ds']
input_path = f'/tmp/raw_sales_{execution_date}.csv'
print(f"Validating data quality for {input_path}")
df = pd.read_csv(input_path)
# Define quality checks
quality_issues = []
# Check for null values in critical fields
critical_fields = ['order_id', 'customer_id', 'order_amount']
for field in critical_fields:
null_count = df[field].isnull().sum()
if null_count > 0:
quality_issues.append(f"{field} has {null_count} null values")
# Check for reasonable order amounts
if (df['order_amount'] < 0).any():
quality_issues.append("Found negative order amounts")
if (df['order_amount'] > 10000).any():
quality_issues.append("Found suspiciously high order amounts (>$10,000)")
# Check for duplicate order IDs
duplicate_orders = df['order_id'].duplicated().sum()
if duplicate_orders > 0:
quality_issues.append(f"Found {duplicate_orders} duplicate order IDs")
if quality_issues:
error_msg = "Data quality issues found:\n" + "\n".join(quality_issues)
print(error_msg)
raise ValueError(error_msg)
print(f"Data quality validation passed for {len(df)} records")
return input_path
def calculate_daily_metrics(**context):
"""Calculate daily business metrics"""
execution_date = context['ds']
input_path = f'/tmp/raw_sales_{execution_date}.csv'
print(f"Calculating metrics from {input_path}")
df = pd.read_csv(input_path)
# Calculate various metrics
metrics = {
'date': execution_date,
'total_orders': len(df),
'total_revenue': df['order_amount'].sum(),
'average_order_value': df['order_amount'].mean(),
'total_discount_given': df['discount_applied'].sum(),
'unique_customers': df['customer_id'].nunique(),
'top_category_by_orders': df['product_category'].mode()[0],
'revenue_by_category': df.groupby('product_category')['order_amount'].sum().to_dict()
}
print("Daily Metrics Calculated:")
for key, value in metrics.items():
if key != 'revenue_by_category':
print(f" {key}: {value}")
# Save metrics to database (using SQLite for demo)
db_path = '/tmp/ecommerce_metrics.db'
conn = sqlite3.connect(db_path)
# Create table if it doesn't exist
conn.execute('''
CREATE TABLE IF NOT EXISTS daily_metrics (
date TEXT PRIMARY KEY,
total_orders INTEGER,
total_revenue REAL,
average_order_value REAL,
total_discount_given REAL,
unique_customers INTEGER,
top_category_by_orders TEXT
)
''')
# Insert metrics (replace if exists)
conn.execute('''
INSERT OR REPLACE INTO daily_metrics
(date, total_orders, total_revenue, average_order_value,
total_discount_given, unique_customers, top_category_by_orders)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
metrics['date'],
metrics['total_orders'],
metrics['total_revenue'],
metrics['average_order_value'],
metrics['total_discount_given'],
metrics['unique_customers'],
metrics['top_category_by_orders']
))
conn.commit()
conn.close()
return metrics
def cleanup_temp_files(**context):
"""Clean up temporary files created during pipeline execution"""
execution_date = context['ds']
temp_file = f'/tmp/raw_sales_{execution_date}.csv'
if os.path.exists(temp_file):
os.remove(temp_file)
print(f"Cleaned up temporary file: {temp_file}")
else:
print(f"Temporary file not found: {temp_file}")
# Define tasks
extract_task = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_sales_data,
dag=dag
)
validate_task = PythonOperator(
task_id='validate_data_quality',
python_callable=validate_data_quality,
dag=dag
)
calculate_task = PythonOperator(
task_id='calculate_daily_metrics',
python_callable=calculate_daily_metrics,
dag=dag
)
cleanup_task = PythonOperator(
task_id='cleanup_temp_files',
python_callable=cleanup_temp_files,
dag=dag,
trigger_rule='all_done' # Run even if upstream tasks fail
)
# Set up dependencies
extract_task >> validate_task >> calculate_task >> cleanup_task
This pipeline demonstrates several important Airflow concepts:
Save the DAG file and refresh your Airflow web interface. You should see the new ecommerce_daily_analytics DAG appear. Click on it to view the graph view, which shows your pipeline visually.
The >> operator creates dependencies between tasks. When you write task_a >> task_b >> task_c, you're telling Airflow:
task_b can't start until task_a completes successfullytask_c can't start until task_b completes successfullyYou can also create more complex dependency patterns:
# Multiple upstream dependencies
[task_a, task_b] >> task_c # task_c waits for both task_a and task_b
# Multiple downstream dependencies
task_a >> [task_b, task_c] # task_b and task_c both depend on task_a
# Mixed patterns
[extract_crm, extract_orders] >> join_data >> [calculate_metrics, generate_report]
Let's explore some advanced task configuration options that you'll use in production pipelines:
# Task with custom retry behavior
sensitive_task = PythonOperator(
task_id='process_sensitive_data',
python_callable=process_pii_data,
retries=1, # Override DAG default
retry_delay=timedelta(minutes=30), # Longer delay for sensitive operations
email_on_retry=True, # Alert on retries for sensitive tasks
dag=dag
)
# Task that continues pipeline even if it fails
optional_notification = EmailOperator(
task_id='send_optional_notification',
to=['stakeholder@company.com'],
subject='Daily Pipeline Completed',
html_content='<p>Your daily analytics pipeline completed successfully.</p>',
trigger_rule='all_done', # Run whether upstream tasks succeed or fail
dag=dag
)
# Task with resource requirements
heavy_computation = PythonOperator(
task_id='heavy_ml_computation',
python_callable=train_ml_model,
pool='gpu_pool', # Limit concurrent GPU usage
execution_timeout=timedelta(hours=4), # Kill if it takes too long
dag=dag
)
The trigger_rule parameter is particularly useful for creating robust pipelines:
all_success (default): Run only if all upstream tasks succeedall_failed: Run only if all upstream tasks failall_done: Run regardless of upstream task statusone_success: Run if at least one upstream task succeedsone_failed: Run if at least one upstream task failsReal-world data pipelines often have complex scheduling requirements beyond simple daily or hourly runs. Let's explore Airflow's advanced scheduling capabilities.
Airflow uses cron expressions for flexible scheduling:
# Various scheduling examples
dags = {
'hourly_pipeline': DAG(
'process_hourly_data',
schedule_interval='0 * * * *', # Every hour at minute 0
# ... other config
),
'business_hours_only': DAG(
'business_hours_reports',
schedule_interval='0 9-17 * * 1-5', # 9 AM to 5 PM, Monday-Friday
# ... other config
),
'monthly_reports': DAG(
'monthly_financial_reports',
schedule_interval='0 6 1 * *', # 6 AM on the 1st of every month
# ... other config
),
'manual_only': DAG(
'ad_hoc_analysis',
schedule_interval=None, # Only run when manually triggered
# ... other config
)
}
Sometimes you need to wait for external conditions before starting your pipeline. Airflow sensors solve this problem:
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.sql import SqlSensor
from airflow.sensors.s3_key_sensor import S3KeySensor
# Wait for a file to appear
wait_for_data_file = FileSensor(
task_id='wait_for_daily_export',
filepath='/data/exports/sales_{{ ds }}.csv', # Uses templating
poke_interval=300, # Check every 5 minutes
timeout=3600, # Give up after 1 hour
dag=dag
)
# Wait for database condition
wait_for_etl_completion = SqlSensor(
task_id='wait_for_upstream_etl',
conn_id='analytics_db',
sql="""
SELECT COUNT(*)
FROM etl_status
WHERE date = '{{ ds }}'
AND status = 'completed'
AND process_name = 'daily_sales_etl'
""",
poke_interval=600, # Check every 10 minutes
dag=dag
)
# Chain sensors with processing tasks
wait_for_data_file >> wait_for_etl_completion >> extract_task
For pipelines that need to process multiple similar datasets, you can generate tasks dynamically:
def create_processing_dag():
"""Create a DAG that processes multiple product categories"""
dag = DAG(
'multi_category_processing',
default_args=default_args,
schedule_interval='0 4 * * *',
description='Process each product category independently'
)
categories = ['electronics', 'clothing', 'books', 'home', 'sports']
# Create extract task for each category
extract_tasks = []
for category in categories:
extract_task = PythonOperator(
task_id=f'extract_{category}_data',
python_callable=extract_category_data,
op_kwargs={'category': category},
dag=dag
)
extract_tasks.append(extract_task)
# Create a summary task that waits for all extracts
summarize_task = PythonOperator(
task_id='summarize_all_categories',
python_callable=create_category_summary,
dag=dag
)
# Set dependencies: all extracts must complete before summary
extract_tasks >> summarize_task
return dag
# Create the DAG
multi_category_dag = create_processing_dag()
# Make it available to Airflow
globals()['multi_category_processing'] = multi_category_dag
This pattern is extremely useful when you have similar processing logic that needs to be applied to different data sources, regions, or business units.
Production data pipelines fail. Hardware fails, APIs go down, data formats change unexpectedly. The key is building pipelines that fail gracefully and provide clear visibility into what went wrong.
def extract_with_circuit_breaker(**context):
"""Extract data with built-in failure detection"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure retry strategy
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=2 # Wait 2, 4, 8 seconds between retries
)
session = requests.Session()
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
api_url = "https://api.example.com/sales-data"
try:
response = session.get(api_url, timeout=30)
response.raise_for_status()
data = response.json()
# Validate response structure
if 'sales_records' not in data:
raise ValueError("API response missing 'sales_records' field")
if len(data['sales_records']) == 0:
# This might be normal on weekends, but log it
print("WARNING: API returned zero sales records")
return data['sales_records']
except requests.exceptions.RequestException as e:
# Log the specific error for debugging
print(f"API request failed: {str(e)}")
# Try alternative data source or fail gracefully
fallback_data = load_cached_data(context['ds'])
if fallback_data:
print("Using cached data as fallback")
return fallback_data
# Re-raise if no fallback available
raise
def load_cached_data(execution_date):
"""Load data from cache if available"""
cache_file = f'/data/cache/sales_{execution_date}.json'
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
return json.load(f)
return None
Instead of just sending emails on failures, you can create sophisticated alerting logic:
def custom_failure_handler(context):
"""Custom logic for handling task failures"""
task_instance = context['task_instance']
dag_id = context['dag'].dag_id
task_id = context['task'].task_id
execution_date = context['ds']
# Determine alert severity based on task and failure type
if task_id.startswith('extract_'):
severity = 'HIGH' # Data source issues are critical
elif task_id.startswith('validate_'):
severity = 'MEDIUM' # Data quality issues need attention
else:
severity = 'LOW' # Other tasks less critical
# Check if this is a recurring failure
failure_count = get_recent_failure_count(dag_id, task_id)
if failure_count >= 3:
severity = 'CRITICAL' # Escalate recurring failures
# Send appropriate alerts
alert_message = f"""
Pipeline Failure Alert
DAG: {dag_id}
Task: {task_id}
Execution Date: {execution_date}
Severity: {severity}
Error: {context['exception']}
Recent failure count: {failure_count}
"""
if severity in ['HIGH', 'CRITICAL']:
send_slack_alert(alert_message, channel='#data-incidents')
send_pager_duty_alert(alert_message)
else:
send_email_alert(alert_message)
# Apply custom handler to tasks
critical_task = PythonOperator(
task_id='critical_data_processing',
python_callable=process_critical_data,
on_failure_callback=custom_failure_handler,
dag=dag
)
Implement data quality checks that prevent bad data from propagating downstream:
def comprehensive_data_validation(**context):
"""Comprehensive data validation with detailed reporting"""
execution_date = context['ds']
df = pd.read_csv(f'/tmp/raw_sales_{execution_date}.csv')
validation_results = {
'total_records': len(df),
'validation_timestamp': datetime.now().isoformat(),
'checks': {}
}
# Check 1: Completeness
null_counts = df.isnull().sum()
for column in df.columns:
null_pct = (null_counts[column] / len(df)) * 100
validation_results['checks'][f'{column}_completeness'] = {
'test': 'completeness',
'null_count': int(null_counts[column]),
'null_percentage': round(null_pct, 2),
'passed': null_pct < 5.0 # Fail if >5% nulls
}
# Check 2: Value ranges
if 'order_amount' in df.columns:
amount_stats = df['order_amount'].describe()
validation_results['checks']['order_amount_range'] = {
'test': 'value_range',
'min_value': float(amount_stats['min']),
'max_value': float(amount_stats['max']),
'mean_value': float(amount_stats['mean']),
'passed': amount_stats['min'] >= 0 and amount_stats['max'] <= 50000
}
# Check 3: Uniqueness constraints
if 'order_id' in df.columns:
duplicate_count = df['order_id'].duplicated().sum()
validation_results['checks']['order_id_uniqueness'] = {
'test': 'uniqueness',
'duplicate_count': int(duplicate_count),
'passed': duplicate_count == 0
}
# Check 4: Cross-field consistency
if 'order_date' in df.columns:
future_dates = (pd.to_datetime(df['order_date']) > pd.Timestamp.now()).sum()
validation_results['checks']['date_consistency'] = {
'test': 'temporal_consistency',
'future_date_count': int(future_dates),
'passed': future_dates == 0
}
# Determine overall validation status
failed_checks = [
check_name for check_name, check_result in validation_results['checks'].items()
if not check_result['passed']
]
validation_results['overall_status'] = 'PASSED' if len(failed_checks) == 0 else 'FAILED'
validation_results['failed_checks'] = failed_checks
# Log results
print(f"Data Validation Results:")
print(f" Total Records: {validation_results['total_records']}")
print(f" Overall Status: {validation_results['overall_status']}")
if failed_checks:
print(f" Failed Checks: {', '.join(failed_checks)}")
# Save detailed results for investigation
results_file = f'/tmp/validation_results_{execution_date}.json'
with open(results_file, 'w') as f:
json.dump(validation_results, f, indent=2)
raise ValueError(f"Data validation failed. Details saved to {results_file}")
return validation_results
Now let's build a complete pipeline that demonstrates everything we've covered. You'll create a customer segmentation pipeline that:
Create a new DAG file dags/customer_segmentation.py that implements the following pipeline:
Data Flow:
Business Rules:
Here's the framework to get you started:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np
from pathlib import Path
default_args = {
'owner': 'marketing_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'customer_segmentation_pipeline',
default_args=default_args,
description='Customer RFM segmentation analysis',
schedule_interval='0 6 * * 1', # Weekly on Monday at 6 AM
catchup=False,
tags=['marketing', 'segmentation', 'weekly']
)
def extract_customer_data(**context):
"""Extract customer master data"""
# TODO: Implement customer data extraction
# Should return customer_id, registration_date, email
pass
def extract_transaction_data(**context):
"""Extract transaction history"""
# TODO: Implement transaction data extraction
# Should return transaction_id, customer_id, amount, transaction_date
pass
def validate_datasets(**context):
"""Validate extracted data meets quality requirements"""
# TODO: Implement validation logic
# Check for nulls, duplicates, data consistency
pass
def calculate_rfm_scores(**context):
"""Calculate RFM scores for each customer"""
# TODO: Implement RFM calculation
# Return customer_id, recency_score, frequency_score, monetary_score
pass
def assign_customer_segments(**context):
"""Assign customers to marketing segments based on RFM"""
# TODO: Implement segmentation logic
# Return customer_id, segment, rfm_score
pass
def generate_summary_report(**context):
"""Generate executive summary of customer segments"""
# TODO: Create summary with segment counts, revenue by segment
pass
# TODO: Define your tasks and dependencies here
# Example task structure:
# extract_customers = PythonOperator(...)
# extract_transactions = PythonOperator(...)
# validate_data = PythonOperator(...)
# calculate_rfm = PythonOperator(...)
# segment_customers = PythonOperator(...)
# generate_report = PythonOperator(...)
# TODO: Set up dependencies
Here's a complete implementation:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np
import json
from pathlib import Path
default_args = {
'owner': 'marketing_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'customer_segmentation_pipeline',
default_args=default_args,
description='Customer RFM segmentation analysis',
schedule_interval='0 6 * * 1', # Weekly on Monday at 6 AM
catchup=False,
tags=['marketing', 'segmentation', 'weekly']
)
def extract_customer_data(**context):
"""Extract customer master data"""
execution_date = context['ds']
# Simulate customer data
np.random.seed(42) # For reproducible results
n_customers = 1000
customer_data = {
'customer_id': [f'CUST_{i:05d}' for i in range(1, n_customers + 1)],
'registration_date': pd.date_range('2020-01-01', '2024-01-01', periods=n_customers).strftime('%Y-%m-%d'),
'email': [f'customer{i}@example.com' for i in range(1, n_customers + 1)]
}
df = pd.DataFrame(customer_data)
output_path = f'/tmp/customers_{execution_date}.csv'
df.to_csv(output_path, index=False)
print(f"Extracted {len(df)} customer records to {output_path}")
return output_path
def extract_transaction_data(**context):
"""Extract transaction history"""
execution_date = context['ds']
# Simulate transaction data with realistic patterns
np.random.seed(42)
customers = [f'CUST_{i:05d}' for i in range(1, 1001)]
# Generate transactions with varying patterns per customer
transactions = []
transaction_id = 1
for customer_id in customers:
# Some customers are more active than others
customer_activity = np.random.choice(['high', 'medium', 'low'], p=[0.2, 0.5, 0.3])
if customer_activity == 'high':
n_transactions = np.random.poisson(15) # High activity customers
amount_base = 150
elif customer_activity == 'medium':
n_transactions = np.random.poisson(5) # Medium activity customers
amount_base = 80
else:
n_transactions = np.random.poisson(1) # Low activity customers
amount_base = 50
for _ in range(max(1, n_transactions)): # At least 1 transaction per customer
days_ago = np.random.randint(1, 365)
transaction_date = (datetime.now() - timedelta(days=days_ago)).strftime('%Y-%m-%d')
amount = max(10, np.random.normal(amount_base, amount_base * 0.3))
transactions.append({
'transaction_id': f'TXN_{transaction_id:08d}',
'customer_id': customer_id,
'amount': round(amount, 2),
'transaction_date': transaction_date
})
transaction_id += 1
df = pd.DataFrame(transactions)
output_path = f'/tmp/transactions_{execution_date}.csv'
df.to_csv(output_path, index=False)
print(f"Extracted {len(df)} transaction records to {output_path}")
return output_path
def validate_datasets(**context):
"""Validate extracted data meets quality requirements"""
execution_date = context['ds']
# Load datasets
customers_df = pd.read_csv(f'/tmp/customers_{execution_date}.csv')
transactions_df = pd.read_csv(f'/tmp/transactions_{execution_date}.csv')
issues = []
# Validate customers dataset
if customers_df['customer_id'].isnull().any():
issues.append("Customer dataset has null customer_ids")
if customers_df['customer_id'].duplicated().any():
issues.append("Customer dataset has duplicate customer_ids")
# Validate transactions dataset
if transactions_df['customer_id'].isnull().any():
issues.append("Transaction dataset has null customer_ids")
if transactions_df['amount'].isnull().any():
issues.append("Transaction dataset has null amounts")
if (transactions_df['amount'] <= 0).any():
issues.append("Transaction dataset has non-positive amounts")
# Cross-dataset validation
customer_ids_in_customers = set(customers_df['customer_id'])
customer_ids_in_transactions = set(transactions_df['customer_id'])
orphaned_transactions = customer_ids_in_transactions - customer_ids_in_customers
if orphaned_transactions:
issues.append(f"Found {len(orphaned_transactions)} transactions for non-existent customers")
if issues:
error_msg = "Data validation failed:\n" + "\n".join(issues)
raise ValueError(error_msg)
print(f"Data validation passed:")
print(f" Customers: {len(customers_df)} records")
print(f" Transactions: {len(transactions_df)} records")
return True
def calculate_rfm_scores(**context):
"""Calculate RFM scores for each customer"""
execution_date = context['ds']
customers_df = pd.read_csv(f'/tmp/customers_{execution_date}.csv')
transactions_df = pd.read_csv(f'/tmp/transactions_{execution_date}.csv')
# Convert date columns
transactions_df['transaction_date'] = pd.to_datetime(transactions_df['transaction_date'])
analysis_date = pd.to_datetime(execution_date)
# Calculate RFM metrics for each customer
rfm_data = []
for customer_id in customers_df['customer_id']:
customer_transactions = transactions_df[transactions_df['customer_id'] == customer_id]
if len(customer_transactions) == 0:
# Customer with no transactions
recency = 999 # Very high recency (bad)
frequency = 0
monetary = 0
else:
# Recency: days since last transaction
last_transaction = customer_transactions['transaction_date'].max()
recency = (analysis_date - last_transaction).days
# Frequency: number of transactions in last 365 days
recent_transactions = customer_transactions[
customer_transactions['transaction_date'] >= (analysis_date - timedelta(days=365))
]
frequency = len(recent_transactions)
# Monetary: total amount spent in last 365 days
monetary = recent_transactions['amount'].sum()
rfm_data.append({
'customer_id': customer_id,
'recency': recency,
'frequency': frequency,
'monetary': round(monetary, 2)
})
rfm_df = pd.DataFrame(rfm_data)
# Calculate quintile scores (1-5, where 5 is best)
rfm_df['recency_score'] = pd.qcut(rfm_df['recency'], 5, labels=[5,4,3,2,1]) # Lower recency gets higher score
rfm_df['frequency_score'] = pd.qcut(rfm_df['frequency'].rank(method='first'), 5, labels=[1,2,3,4,5])
rfm_df['monetary_score'] = pd.qcut(rfm_df['monetary'].rank(method='first'), 5, labels=[1,2,3,4,5])
# Create combined RFM score
rfm_df['rfm_score'] = (rfm_df['recency_score'].astype(str) +
rfm_df['frequency_score'].astype(str) +
rfm_df['monetary_score'].astype(str))
output_path = f'/tmp/rfm_scores_{execution_date}.csv'
rfm_df.to_csv(output_path, index=False)
print(f"Calculated RFM scores for {len(rfm_df)} customers")
print(f"Sample RFM distribution:")
print(f" Avg Recency: {rfm_df['recency'].mean():.1f} days")
print(f" Avg Frequency: {rfm_df['frequency'].mean():.1f} transactions")
print(f" Avg Monetary: ${rfm_df['monetary'].mean():.2f}")
return output_path
def assign_customer_segments(**context):
"""Assign customers to marketing segments based on RFM"""
execution_date = context['ds']
rfm_df = pd.read_csv(f'/tmp/rfm_scores_{execution_date}.csv')
def assign_segment(row):
rfm = str(row['rfm_score'])
recency = int(rfm[0])
frequency = int(rfm[1])
monetary = int(rfm[2])
# High-value segments
if recency >= 4 and frequency >= 4 and monetary >= 4:
return 'Champions'
elif recency >= 3 and frequency >= 4 and monetary >= 4:
return 'Loyal Customers'
elif recency >= 4 and frequency <= 2 and monetary >= 4:
return 'Potential Loyalists'
# At-risk segments
elif recency <= 2 and frequency >= 3 and monetary >= 3:
return 'At Risk'
elif recency <= 2 and frequency <= 2 and monetary >= 4:
return 'Cannot Lose Them'
# Low-engagement segments
elif recency >= 3 and frequency <= 2 and monetary <= 2:
return 'New Customers'
elif recency <= 3 and frequency <= 2 and monetary <= 2:
return 'Hibernating'
# Default segments
elif frequency >= 3 and monetary <= 3:
return 'Need Attention'
else:
return 'Others'
rfm_df['segment'] = rfm_df.apply(assign_segment, axis=1)
output_path = f'/tmp/customer_segments_{execution_date}.csv'
rfm_df.to_csv(output_path, index=False)
# Log segment distribution
segment_counts = rfm_df['segment'].value_counts()
print("Customer Segment Distribution:")
for segment, count in segment_counts.items():
pct = (count / len(rfm_df)) * 100
print(f" {segment}: {count} customers ({pct:.1f}%)")
return output_path
def generate_summary_report(**context):
"""Generate executive summary of customer segments"""
execution_date = context['ds']
segments_df = pd.read_csv(f'/tmp/customer_segments_{execution_date}.csv')
transactions_df = pd.read_csv(f'/tmp/transactions_{execution_date}.csv')
# Calculate segment-level metrics
segment_summary = segments_df.groupby('segment').agg({
'customer_id': 'count',
'monetary': ['sum', 'mean'],
'frequency': 'mean',
'recency': 'mean'
}).round(2)
segment_summary.columns = ['customer_count', 'total_revenue', 'avg_revenue', 'avg_frequency', 'avg_recency']
segment_summary = segment_summary.reset_index()
# Calculate revenue percentage by segment
total_revenue = segment_summary['total_revenue'].sum()
segment_summary['revenue_percentage'] = (segment_summary['total_revenue'] / total_revenue * 100).round(1)
# Create executive report
report = {
'analysis_date': execution_date,
'total_customers': len(segments_df),
'total_revenue': float(total_revenue),
'segment_breakdown': segment_summary.to_dict('records'),
'key_insights': []
}
# Generate insights
top_segment = segment_summary.nlargest(1, 'revenue_percentage').iloc[0]
report['key_insights'].append(
f"Top revenue segment: {top_segment['segment']} "
f"({top_segment['revenue_percentage']}% of revenue with {top_segment['customer_count']} customers)"
)
at_risk_customers = segments_df[segments_df['segment'] == 'At Risk']['customer_id'].count()
if at_risk_customers > 0:
report['key_insights'].append(
f"Warning: {at_risk_customers} customers identified as 'At Risk' - consider retention campaign"
)
champions = segments_df[segments_df['segment'] == 'Champions']['customer_id'].count()
report['key_insights'].append(
f"Champion customers: {champions} customers - ideal for upsell/cross-sell campaigns"
)
# Save report
report_path = f'/tmp/customer_segmentation_report_{execution_date}.json'
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
print("Customer Segmentation Analysis Complete!")
print(f"Total Customers Analyzed: {report['total_customers']:,}")
print(f"Total Revenue: ${report['total_revenue']:,.2f}")
print("\nKey Insights:")
for insight in report['key_insights']:
print(f" • {insight}")
print(f"\nDetailed report saved to: {report_path}")
return report_path
# Define tasks
extract_customers = PythonOperator(
task_id='extract_customer_data',
python_callable=extract_customer_data,
dag=dag
)
extract_transactions = PythonOperator(
task_id='extract_transaction_data',
python_callable=extract_transaction_data,
dag=dag
)
validate_data = PythonOperator(
task_id='validate_datasets',
python_callable=validate_datasets,
dag=dag
)
calculate_rfm = PythonOperator(
task_id='calculate_rfm_scores',
python_callable=calculate_rfm_scores,
dag=dag
)
segment_customers = PythonOperator(
task_id='assign_customer_segments',
python_callable=assign_customer_segments,
dag=dag
)
generate_report = PythonOperator(
task_id='generate_summary_report',
python_callable=generate_summary_report,
dag=dag
)
# Define dependencies
[extract_customers, extract_transactions] >> validate_data >> calculate_rfm >> segment_customers >> generate_report
Save this DAG and trigger it manually from the Airflow UI to see the complete customer segmentation pipeline in action. You should see the pipeline execute each task in sequence, handling the data extraction, validation, RFM calculation, segmentation, and reporting.
Even experienced developers make mistakes when building Airflow pipelines. Here are the most common issues and how to avoid them.
The Wrong Way:
# This will NOT work as expected
shared_data = {}
def task_a(**context):
shared_data['result'] = 'some important data'
def task_b(**context):
# This will be empty! Tasks run in different processes
data = shared_data.get('result') # None
The Right Way:
# Use XComs or file/database storage
def task_a(**context):
result = 'some important data'
# Store in XCom (for small data)
return result
def task_b(**context):
# Retrieve from XCom
result = context['task_instance'].xcom_pull(task_ids='task_a')
# Or use files/database for larger data
def task_a(**context):
result = calculate_something_big()
file_path = f'/tmp/data_{context["ds"]}.json'
with open(file_path, 'w') as f:
json.dump(result, f)
return file_path
def task_b(**context):
file_path = context['task_instance'].xcom_pull(task_ids='task_a')
with open(file_path, 'r') as f:
result = json.load(f)
The Problem:
dag = DAG(
'daily_reports',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=True # This will try to run every day since Jan 1!
)
If you deploy this DAG on February 15th, Airflow will try to run 45 backfill tasks immediately, potentially overwhelming your system.
The Solution:
dag = DAG(
'daily_reports',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
catchup=False, # Only run from now forward
# Or use max_active_runs=1 to limit concurrent executions
max_active_runs=1
)
The Problem:
def potentially_slow_task(**context):
# This could run forever if the API hangs
response = requests.get('https://slow-api.example.com/data')
return response.json()
task = PythonOperator(
task_id='fetch_data',
python_callable=potentially_slow_task
# No timeout specified!
)
The Solution:
def robust_api_call(**context):
try:
response = requests.get(
'https://slow-api.example.com/data',
timeout=30 # 30 second timeout
)
response.raise_for_status()
return response.json()
except requests.Timeout:
print("API call timed out after 30 seconds")
raise
except requests.RequestException as e:
print(f"API call failed: {e}")
raise
task = PythonOperator(
task_id='fetch_data',
python_callable=robust_api_call,
execution_timeout=timedelta(minutes=10) # Kill task after 10 minutes
)
The Problem:
# Creating a separate task for every small operation
validate_field_1 = PythonOperator(task_id='validate_customer_id', ...)
validate_field_2 = PythonOperator(task_id='validate_email', ...)
validate_field_3 = PythonOperator(task_id='validate_phone', ...)
# ... 20 more validation tasks
This creates unnecessary complexity and overhead. Each task has startup/teardown costs.
The Solution:
def comprehensive_validation(**context):
"""Single task that performs all related validations"""
validations = [
validate_customer_id,
validate_email,
validate_phone,
validate_address
]
results = {}
for validation_func in validations:
try:
result = validation_func(context['data'])
results[validation_func.__name__] = {'status': 'passed', 'result': result}
except Exception as e:
results[validation_func.__name__] = {'status': 'failed', 'error': str(e)}
# Fail if any critical validations failed
failed_validations = [name for name, result in results.items()
if result['status'] == 'failed']
if failed_validations:
raise ValueError(f"Validations failed: {failed_validations}")
return results
validation_task = PythonOperator(
task_id='comprehensive_data_validation',
python_callable=comprehensive_validation
)
The Problem:
def extract_data(**context):
# Hardcoding credentials and endpoints
conn_string = "postgresql://user:password@prod-db:5432/sales"
api_key = "abc123def456"
# This is insecure and inflexible
The Solution:
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable
def extract_data(**context):
# Use Airflow connections for databases
postgres_hook = PostgresHook(postgres_conn_id='sales_db')
df = postgres_hook.get_pandas_df("SELECT * FROM daily_sales")
# Use Airflow variables for configuration
api_key = Variable.get('external_api_key', default_var=None)
if not api_key:
raise ValueError("API key not configured in Airflow Variables")
return df
Set up connections and variables through the Airflow UI under Admin > Connections and Admin > Variables.
When tasks fail, here's your debugging checklist:
Common Error Patterns:
# Pattern 1: Import errors
# Error: ModuleNotFoundError: No module named 'custom_module'
# Solution: Ensure all dependencies are installed in your Airflow environment
# Pattern 2: Connection errors
# Error: could not connect to server: Connection refused
# Solution: Check that your connection ID exists and credentials are correct
# Pattern 3: Memory errors
# Error: Killed (signal 9)
# Solution: Reduce data size, add execution_timeout, or increase worker memory
# Pattern 4: Serialization errors
# Error: Object of type 'datetime' is not JSON serializable
# Solution: Convert non-serializable objects before returning from tasks
def serialize_datetime(obj):
if isinstance(obj, datetime):
return obj.isoformat()
return obj
You've now built a solid foundation in Apache Airflow for production data pipeline orchestration. Let's recap the key concepts that will serve you in real-world scenarios:
DAG Design Principles - You learned to think about data pipelines as directed graphs with clear dependencies, not just collections of scripts. This mental model helps you design more reliable and maintainable workflows.
Task Orchestration - Beyond simple scheduling, you can now handle complex dependency patterns, implement proper retry logic, and build pipelines that fail gracefully with meaningful error messages.
Production Reliability - The error handling patterns, data validation gates, and monitoring strategies you've implemented will prevent the silent failures that plague many data pipelines. Your pipelines now fail fast with clear diagnostics.
Real-world Complexity - Through the customer segmentation exercise, you've seen how to handle multi-step data transformations, business logic implementation, and comprehensive data validation in a production context.
The customer segmentation pipeline you built demonstrates sophisticated data engineering: extracting from multiple sources, implementing business rules (RFM scoring), handling edge cases (customers with no transactions), and generating actionable business insights. This pattern scales to enterprise scenarios with millions of customers and complex business rules.
1. Master Advanced Operators and Hooks - Explore Airflow's extensive library of pre-built operators for common integrations: S3Hook for AWS operations, SlackOperator for notifications, DockerOperator for containerized tasks, and KubernetesOperator for scalable computation. Understanding these operators will dramatically speed up your pipeline development.
2. Implement Production Monitoring and Alerting - Learn to set up comprehensive monitoring using Airflow's integration with Prometheus, Grafana, and modern observability platforms. Production pipelines need proactive monitoring of data quality metrics, processing times, and resource utilization - not just failure alerts.
3. Scale to Distributed Execution - Once you're comfortable with local Airflow, explore distributed executors like the Kubernetes Executor or Celery Executor. This knowledge becomes critical when your pipelines need to process terabytes of data or run hundreds of concurrent tasks across multiple machines.
These next steps will take you from competent Airflow user to someone who can architect enterprise-scale data infrastructure. Each builds naturally on the foundation you've established, with distributed execution being particularly valuable as your data volumes and complexity grow.
Learning Path: Modern Data Stack