
You're managing a growing e-commerce platform, and data is scattered across multiple systems: user activity streams from your web analytics, transaction records from your payment processor, product catalog updates from your inventory management system, and customer service tickets from your support platform. Your executive team needs daily reports, your data science team needs clean datasets for machine learning models, and your engineering team needs real-time alerts when system performance degrades.
This is the reality for most data professionals today—valuable insights locked away in siloed systems, waiting to be unlocked through robust data pipelines. By the end of this lesson, you'll have built a production-ready data pipeline that can ingest, transform, and deliver data across your organization with the reliability and scalability that enterprise systems demand.
What you'll learn:
This lesson assumes you have solid Python programming experience, familiarity with pandas for data manipulation, basic understanding of SQL databases, and experience with object-oriented programming concepts. You should also be comfortable with command-line operations and have Python 3.8+ installed with pip package management.
Before diving into code, we need to understand what makes a data pipeline robust at the architectural level. A production data pipeline isn't just a script that moves data from point A to point B—it's a sophisticated system that handles failures gracefully, provides visibility into operations, and scales with your organization's needs.
The pipeline we're building follows the Extract-Transform-Load (ETL) pattern, but with modern enhancements. Our architecture includes five key stages: ingestion (handling multiple data sources), validation (ensuring data quality), transformation (business logic and enrichment), storage (multiple output destinations), and monitoring (operational visibility).
import logging
import json
import pandas as pd
import sqlite3
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
import time
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Configure logging for production monitoring
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('pipeline.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
The logging configuration above isn't just good practice—it's essential for production operations. We're writing to both file and console, with timestamps and structured formatting that makes debugging easier when things go wrong at 2 AM.
Our pipeline foundation centers around a configurable, extensible base class that handles common concerns like error handling, retry logic, and state management. This approach allows us to focus on business logic while ensuring operational robustness.
@dataclass
class PipelineConfig:
"""Configuration container for pipeline settings"""
source_db_path: str = "source_data.db"
target_db_path: str = "analytics.db"
api_base_url: str = "https://api.example.com"
api_key: str = ""
batch_size: int = 1000
max_retries: int = 3
retry_delay: int = 5
quality_threshold: float = 0.95
email_alerts: bool = True
smtp_server: str = "smtp.gmail.com"
alert_recipients: List[str] = None
def __post_init__(self):
if self.alert_recipients is None:
self.alert_recipients = []
class PipelineStage:
"""Base class for pipeline stages with error handling and monitoring"""
def __init__(self, name: str, config: PipelineConfig):
self.name = name
self.config = config
self.logger = logging.getLogger(f"pipeline.{name}")
self.metrics = {
'records_processed': 0,
'errors': 0,
'start_time': None,
'end_time': None
}
def execute(self, data: Any = None) -> Any:
"""Execute the pipeline stage with monitoring and error handling"""
self.metrics['start_time'] = datetime.now()
self.logger.info(f"Starting {self.name} stage")
try:
result = self._execute_with_retry(data)
self.metrics['end_time'] = datetime.now()
duration = (self.metrics['end_time'] - self.metrics['start_time']).total_seconds()
self.logger.info(
f"Completed {self.name} stage - "
f"Processed: {self.metrics['records_processed']}, "
f"Errors: {self.metrics['errors']}, "
f"Duration: {duration:.2f}s"
)
return result
except Exception as e:
self.metrics['end_time'] = datetime.now()
self.logger.error(f"Stage {self.name} failed: {str(e)}")
if self.config.email_alerts:
self._send_alert(f"Pipeline stage {self.name} failed", str(e))
raise
def _execute_with_retry(self, data: Any) -> Any:
"""Execute with retry logic for transient failures"""
last_exception = None
for attempt in range(self.config.max_retries):
try:
return self._execute(data)
except Exception as e:
last_exception = e
self.logger.warning(f"Attempt {attempt + 1} failed: {str(e)}")
if attempt < self.config.max_retries - 1:
time.sleep(self.config.retry_delay * (2 ** attempt)) # Exponential backoff
raise last_exception
def _execute(self, data: Any) -> Any:
"""Override this method in concrete stages"""
raise NotImplementedError("Subclasses must implement _execute method")
def _send_alert(self, subject: str, message: str):
"""Send email alert for critical failures"""
try:
msg = MIMEMultipart()
msg['From'] = "data-pipeline@company.com"
msg['Subject'] = f"[DATA PIPELINE ALERT] {subject}"
body = f"""
Pipeline Alert Details:
Stage: {self.name}
Time: {datetime.now().isoformat()}
Error: {message}
Metrics:
Records Processed: {self.metrics['records_processed']}
Errors: {self.metrics['errors']}
"""
msg.attach(MIMEText(body, 'plain'))
# Note: In production, use proper SMTP authentication
# This is a simplified example
self.logger.info(f"Alert would be sent: {subject}")
except Exception as e:
self.logger.error(f"Failed to send alert: {str(e)}")
This foundation provides several critical capabilities. The retry mechanism with exponential backoff handles transient failures gracefully. The metrics collection gives us operational visibility. The alert system ensures that failures don't go unnoticed in production environments.
Data ingestion is where most pipelines fail in production. Systems go down, APIs change, file formats evolve, and network connections drop. Our ingestion stage needs to handle all of these scenarios while maintaining data integrity and providing clear visibility into what's happening.
class DataIngestionStage(PipelineStage):
"""Handles multiple data sources with robust error handling"""
def __init__(self, config: PipelineConfig):
super().__init__("ingestion", config)
self.session = requests.Session()
self.session.headers.update({'Authorization': f'Bearer {config.api_key}'})
def _execute(self, data: Any = None) -> Dict[str, pd.DataFrame]:
"""Ingest data from multiple sources concurrently"""
ingestion_tasks = [
('transactions', self._ingest_database_data),
('user_events', self._ingest_api_data),
('product_catalog', self._ingest_file_data)
]
datasets = {}
# Use thread pool for concurrent ingestion
with ThreadPoolExecutor(max_workers=3) as executor:
future_to_source = {
executor.submit(task_func): source_name
for source_name, task_func in ingestion_tasks
}
for future in as_completed(future_to_source):
source_name = future_to_source[future]
try:
datasets[source_name] = future.result()
self.logger.info(f"Successfully ingested {source_name}")
except Exception as e:
self.logger.error(f"Failed to ingest {source_name}: {str(e)}")
self.metrics['errors'] += 1
# Continue with other sources even if one fails
total_records = sum(len(df) for df in datasets.values())
self.metrics['records_processed'] = total_records
if not datasets:
raise Exception("Failed to ingest data from any source")
return datasets
def _ingest_database_data(self) -> pd.DataFrame:
"""Ingest transaction data from source database"""
try:
conn = sqlite3.connect(self.config.source_db_path)
# Use incremental loading based on last processed timestamp
last_processed = self._get_last_processed_timestamp('transactions')
query = """
SELECT
transaction_id,
user_id,
product_id,
amount,
currency,
transaction_date,
payment_method,
status
FROM transactions
WHERE transaction_date > ?
ORDER BY transaction_date
"""
df = pd.read_sql_query(query, conn, params=[last_processed])
conn.close()
# Validate critical fields
if df.empty:
self.logger.info("No new transactions to process")
return pd.DataFrame()
# Data type validation and conversion
df['transaction_date'] = pd.to_datetime(df['transaction_date'])
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Remove records with invalid amounts
invalid_amounts = df['amount'].isna().sum()
if invalid_amounts > 0:
self.logger.warning(f"Removing {invalid_amounts} records with invalid amounts")
df = df.dropna(subset=['amount'])
self.logger.info(f"Ingested {len(df)} transaction records")
return df
except Exception as e:
self.logger.error(f"Database ingestion failed: {str(e)}")
raise
def _ingest_api_data(self) -> pd.DataFrame:
"""Ingest user event data from API with pagination"""
try:
all_events = []
page = 1
last_processed = self._get_last_processed_timestamp('user_events')
while True:
params = {
'page': page,
'limit': 1000,
'since': last_processed.isoformat() if last_processed else None
}
response = self.session.get(
f"{self.config.api_base_url}/events",
params=params,
timeout=30
)
if response.status_code == 404 and page == 1:
# API endpoint might not have data yet
self.logger.info("No user events available from API")
return pd.DataFrame()
response.raise_for_status()
data = response.json()
events = data.get('events', [])
if not events:
break
all_events.extend(events)
# Check if there are more pages
if not data.get('has_next_page', False):
break
page += 1
# Rate limiting courtesy
time.sleep(0.1)
if not all_events:
self.logger.info("No new user events to process")
return pd.DataFrame()
df = pd.DataFrame(all_events)
# Normalize nested JSON data
if 'properties' in df.columns:
properties_df = pd.json_normalize(df['properties'])
df = pd.concat([df.drop('properties', axis=1), properties_df], axis=1)
# Convert timestamp strings to datetime
df['event_time'] = pd.to_datetime(df['event_time'])
self.logger.info(f"Ingested {len(df)} user event records")
return df
except requests.RequestException as e:
self.logger.error(f"API ingestion failed: {str(e)}")
raise
except Exception as e:
self.logger.error(f"API data processing failed: {str(e)}")
raise
def _ingest_file_data(self) -> pd.DataFrame:
"""Ingest product catalog from file with format detection"""
try:
# Look for the most recent product catalog file
data_dir = Path("data/incoming")
catalog_files = list(data_dir.glob("product_catalog_*.csv"))
if not catalog_files:
self.logger.warning("No product catalog files found")
return pd.DataFrame()
# Get the most recent file
latest_file = max(catalog_files, key=lambda x: x.stat().st_mtime)
# Detect file encoding
import chardet
with open(latest_file, 'rb') as f:
raw_data = f.read(10000) # Read first 10KB for detection
encoding_result = chardet.detect(raw_data)
encoding = encoding_result['encoding']
# Read with detected encoding
df = pd.read_csv(latest_file, encoding=encoding)
# Validate required columns
required_columns = ['product_id', 'product_name', 'category', 'price']
missing_columns = set(required_columns) - set(df.columns)
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Clean and validate data
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df = df.dropna(subset=['product_id', 'price'])
# Remove duplicates, keeping the last occurrence
df = df.drop_duplicates(subset=['product_id'], keep='last')
self.logger.info(f"Ingested {len(df)} product catalog records from {latest_file.name}")
return df
except Exception as e:
self.logger.error(f"File ingestion failed: {str(e)}")
raise
def _get_last_processed_timestamp(self, source: str) -> Optional[datetime]:
"""Get the last processed timestamp for incremental loading"""
try:
conn = sqlite3.connect(self.config.target_db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT MAX(last_processed_at)
FROM pipeline_state
WHERE source_name = ?
""", (source,))
result = cursor.fetchone()[0]
conn.close()
if result:
return datetime.fromisoformat(result)
else:
# Default to 30 days ago for initial run
return datetime.now() - timedelta(days=30)
except Exception:
# If state table doesn't exist or other error, start from 30 days ago
return datetime.now() - timedelta(days=30)
The ingestion stage demonstrates several advanced patterns. Concurrent processing speeds up data collection from multiple sources. Incremental loading prevents reprocessing the same data. Robust error handling ensures one failing source doesn't break the entire pipeline. Format detection and encoding handling prevent common file processing failures.
Data quality issues are silent killers in analytics systems. Bad data leads to wrong conclusions, which lead to poor business decisions. Our validation stage implements comprehensive checks that catch quality issues early and provide detailed reporting on data health.
class DataQualityValidator:
"""Comprehensive data quality validation with detailed reporting"""
def __init__(self, config: PipelineConfig):
self.config = config
self.logger = logging.getLogger("pipeline.quality_validator")
self.quality_rules = self._load_quality_rules()
def validate_datasets(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Validate all datasets and return comprehensive quality report"""
quality_report = {
'overall_score': 0.0,
'datasets': {},
'critical_issues': [],
'warnings': [],
'timestamp': datetime.now().isoformat()
}
total_score = 0.0
dataset_count = 0
for dataset_name, df in datasets.items():
if df.empty:
self.logger.warning(f"Skipping validation for empty dataset: {dataset_name}")
continue
dataset_report = self._validate_dataset(dataset_name, df)
quality_report['datasets'][dataset_name] = dataset_report
total_score += dataset_report['quality_score']
dataset_count += 1
# Collect critical issues
if dataset_report['critical_issues']:
quality_report['critical_issues'].extend(
[f"{dataset_name}: {issue}" for issue in dataset_report['critical_issues']]
)
# Collect warnings
if dataset_report['warnings']:
quality_report['warnings'].extend(
[f"{dataset_name}: {warning}" for warning in dataset_report['warnings']]
)
if dataset_count > 0:
quality_report['overall_score'] = total_score / dataset_count
# Log summary
self.logger.info(
f"Data quality validation complete - "
f"Overall score: {quality_report['overall_score']:.2f}, "
f"Critical issues: {len(quality_report['critical_issues'])}, "
f"Warnings: {len(quality_report['warnings'])}"
)
# Check if quality meets threshold
if quality_report['overall_score'] < self.config.quality_threshold:
raise Exception(
f"Data quality score {quality_report['overall_score']:.2f} "
f"below threshold {self.config.quality_threshold}"
)
return quality_report
def _validate_dataset(self, dataset_name: str, df: pd.DataFrame) -> Dict[str, Any]:
"""Validate individual dataset with specific rules"""
report = {
'quality_score': 1.0,
'record_count': len(df),
'column_count': len(df.columns),
'checks': {},
'critical_issues': [],
'warnings': [],
'column_profiles': {}
}
rules = self.quality_rules.get(dataset_name, {})
total_checks = 0
passed_checks = 0
# Basic structural checks
total_checks += 1
if len(df) > 0:
passed_checks += 1
report['checks']['non_empty'] = True
else:
report['checks']['non_empty'] = False
report['critical_issues'].append("Dataset is empty")
# Required columns check
required_columns = rules.get('required_columns', [])
if required_columns:
total_checks += 1
missing_columns = set(required_columns) - set(df.columns)
if not missing_columns:
passed_checks += 1
report['checks']['required_columns'] = True
else:
report['checks']['required_columns'] = False
report['critical_issues'].append(f"Missing required columns: {missing_columns}")
# Data type validation
expected_types = rules.get('column_types', {})
for column, expected_type in expected_types.items():
if column in df.columns:
total_checks += 1
actual_type = df[column].dtype
if self._types_compatible(actual_type, expected_type):
passed_checks += 1
report['checks'][f'{column}_type'] = True
else:
report['checks'][f'{column}_type'] = False
report['warnings'].append(
f"Column {column} has type {actual_type}, expected {expected_type}"
)
# Uniqueness constraints
unique_columns = rules.get('unique_columns', [])
for column in unique_columns:
if column in df.columns:
total_checks += 1
duplicates = df[column].duplicated().sum()
if duplicates == 0:
passed_checks += 1
report['checks'][f'{column}_unique'] = True
else:
report['checks'][f'{column}_unique'] = False
report['critical_issues'].append(
f"Column {column} has {duplicates} duplicate values"
)
# Null value constraints
non_null_columns = rules.get('non_null_columns', [])
for column in non_null_columns:
if column in df.columns:
total_checks += 1
null_count = df[column].isna().sum()
null_percentage = (null_count / len(df)) * 100
if null_count == 0:
passed_checks += 1
report['checks'][f'{column}_non_null'] = True
else:
report['checks'][f'{column}_non_null'] = False
if null_percentage > 5: # More than 5% nulls is critical
report['critical_issues'].append(
f"Column {column} has {null_percentage:.1f}% null values"
)
else:
report['warnings'].append(
f"Column {column} has {null_percentage:.1f}% null values"
)
# Range validations
ranges = rules.get('value_ranges', {})
for column, (min_val, max_val) in ranges.items():
if column in df.columns and df[column].dtype in ['int64', 'float64']:
total_checks += 1
out_of_range = ((df[column] < min_val) | (df[column] > max_val)).sum()
if out_of_range == 0:
passed_checks += 1
report['checks'][f'{column}_range'] = True
else:
report['checks'][f'{column}_range'] = False
report['warnings'].append(
f"Column {column} has {out_of_range} values outside range [{min_val}, {max_val}]"
)
# Pattern validations (e.g., email formats, phone numbers)
patterns = rules.get('patterns', {})
for column, pattern in patterns.items():
if column in df.columns:
total_checks += 1
import re
valid_pattern = df[column].astype(str).str.match(pattern, na=False)
invalid_count = (~valid_pattern).sum()
if invalid_count == 0:
passed_checks += 1
report['checks'][f'{column}_pattern'] = True
else:
report['checks'][f'{column}_pattern'] = False
report['warnings'].append(
f"Column {column} has {invalid_count} values not matching expected pattern"
)
# Generate column profiles for monitoring
for column in df.columns:
if df[column].dtype in ['int64', 'float64']:
report['column_profiles'][column] = {
'mean': float(df[column].mean()),
'median': float(df[column].median()),
'std': float(df[column].std()),
'min': float(df[column].min()),
'max': float(df[column].max()),
'null_count': int(df[column].isna().sum())
}
else:
report['column_profiles'][column] = {
'unique_count': int(df[column].nunique()),
'null_count': int(df[column].isna().sum()),
'most_common': df[column].mode().iloc[0] if not df[column].mode().empty else None
}
# Calculate final quality score
if total_checks > 0:
report['quality_score'] = passed_checks / total_checks
return report
def _types_compatible(self, actual_type, expected_type: str) -> bool:
"""Check if data types are compatible"""
type_mappings = {
'int': ['int64', 'int32', 'int16', 'int8'],
'float': ['float64', 'float32', 'int64', 'int32'],
'string': ['object', 'string'],
'datetime': ['datetime64[ns]', 'datetime64'],
'bool': ['bool']
}
compatible_types = type_mappings.get(expected_type, [expected_type])
return str(actual_type) in compatible_types
def _load_quality_rules(self) -> Dict[str, Dict]:
"""Load data quality rules from configuration"""
return {
'transactions': {
'required_columns': ['transaction_id', 'user_id', 'amount', 'transaction_date'],
'column_types': {
'transaction_id': 'string',
'user_id': 'string',
'amount': 'float',
'transaction_date': 'datetime'
},
'unique_columns': ['transaction_id'],
'non_null_columns': ['transaction_id', 'user_id', 'amount'],
'value_ranges': {
'amount': (0, 1000000) # $0 to $1M
}
},
'user_events': {
'required_columns': ['user_id', 'event_type', 'event_time'],
'column_types': {
'user_id': 'string',
'event_type': 'string',
'event_time': 'datetime'
},
'non_null_columns': ['user_id', 'event_type', 'event_time']
},
'product_catalog': {
'required_columns': ['product_id', 'product_name', 'price'],
'column_types': {
'product_id': 'string',
'product_name': 'string',
'price': 'float'
},
'unique_columns': ['product_id'],
'non_null_columns': ['product_id', 'product_name', 'price'],
'value_ranges': {
'price': (0, 100000) # $0 to $100K
}
}
}
class DataValidationStage(PipelineStage):
"""Pipeline stage for data quality validation"""
def __init__(self, config: PipelineConfig):
super().__init__("validation", config)
self.validator = DataQualityValidator(config)
def _execute(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
"""Execute data quality validation"""
quality_report = self.validator.validate_datasets(datasets)
# Store quality report for monitoring
self._store_quality_report(quality_report)
# Log quality metrics
self.metrics['records_processed'] = sum(
report['record_count'] for report in quality_report['datasets'].values()
)
return datasets # Pass through the data unchanged
def _store_quality_report(self, quality_report: Dict[str, Any]):
"""Store quality report for trend analysis"""
try:
conn = sqlite3.connect(self.config.target_db_path)
cursor = conn.cursor()
# Create quality reports table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS quality_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
overall_score REAL,
critical_issues INTEGER,
warnings INTEGER,
report_json TEXT
)
""")
cursor.execute("""
INSERT INTO quality_reports
(timestamp, overall_score, critical_issues, warnings, report_json)
VALUES (?, ?, ?, ?, ?)
""", (
quality_report['timestamp'],
quality_report['overall_score'],
len(quality_report['critical_issues']),
len(quality_report['warnings']),
json.dumps(quality_report)
))
conn.commit()
conn.close()
except Exception as e:
self.logger.error(f"Failed to store quality report: {str(e)}")
This validation framework provides enterprise-grade data quality monitoring. It checks structural integrity, data types, business rules, and statistical properties. The detailed reporting helps identify data quality trends over time, enabling proactive quality management.
Data transformation is where raw data becomes valuable business insights. Our transformation engine handles complex business logic, enrichment operations, and performance optimizations that scale with data volume.
class DataTransformationEngine:
"""Advanced data transformation with optimized operations"""
def __init__(self, config: PipelineConfig):
self.config = config
self.logger = logging.getLogger("pipeline.transformation")
def transform_datasets(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
"""Apply comprehensive transformations to create analytics-ready datasets"""
transformed = {}
# Create base transformed datasets
if 'transactions' in datasets:
transformed['transactions_clean'] = self._transform_transactions(datasets['transactions'])
if 'user_events' in datasets:
transformed['user_events_clean'] = self._transform_user_events(datasets['user_events'])
if 'product_catalog' in datasets:
transformed['products_clean'] = self._transform_products(datasets['product_catalog'])
# Create enriched analytical datasets
if 'transactions_clean' in transformed and 'products_clean' in transformed:
transformed['sales_analytics'] = self._create_sales_analytics(
transformed['transactions_clean'],
transformed['products_clean']
)
if 'user_events_clean' in transformed:
transformed['user_behavior_summary'] = self._create_user_behavior_summary(
transformed['user_events_clean']
)
# Create cross-dataset insights
available_datasets = set(transformed.keys())
required_for_360 = {'transactions_clean', 'user_events_clean', 'products_clean'}
if required_for_360.issubset(available_datasets):
transformed['customer_360'] = self._create_customer_360_view(
transformed['transactions_clean'],
transformed['user_events_clean'],
transformed['products_clean']
)
return transformed
def _transform_transactions(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean and enrich transaction data"""
if df.empty:
return df
df = df.copy()
# Standardize currency amounts to USD
df['amount_usd'] = df.apply(self._convert_to_usd, axis=1)
# Create derived fields
df['transaction_hour'] = df['transaction_date'].dt.hour
df['transaction_day_of_week'] = df['transaction_date'].dt.day_name()
df['transaction_month'] = df['transaction_date'].dt.to_period('M').astype(str)
# Categorize transaction amounts
df['amount_category'] = pd.cut(
df['amount_usd'],
bins=[0, 10, 50, 100, 500, float('inf')],
labels=['micro', 'small', 'medium', 'large', 'enterprise']
)
# Calculate days since first transaction per user
first_transaction = df.groupby('user_id')['transaction_date'].min().reset_index()
first_transaction.columns = ['user_id', 'first_transaction_date']
df = df.merge(first_transaction, on='user_id', how='left')
df['days_since_first_transaction'] = (
df['transaction_date'] - df['first_transaction_date']
).dt.days
# Add running totals and counts per user
df = df.sort_values(['user_id', 'transaction_date'])
df['user_transaction_count'] = df.groupby('user_id').cumcount() + 1
df['user_running_total'] = df.groupby('user_id')['amount_usd'].cumsum()
# Flag potential fraudulent transactions
df['is_potentially_fraudulent'] = self._flag_potential_fraud(df)
# Remove sensitive data that shouldn't be in analytics
columns_to_keep = [
'transaction_id', 'user_id', 'product_id', 'amount_usd',
'transaction_date', 'payment_method', 'status',
'transaction_hour', 'transaction_day_of_week', 'transaction_month',
'amount_category', 'days_since_first_transaction',
'user_transaction_count', 'user_running_total', 'is_potentially_fraudulent'
]
df = df[columns_to_keep]
self.logger.info(f"Transformed {len(df)} transaction records")
return df
def _transform_user_events(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean and structure user event data"""
if df.empty:
return df
df = df.copy()
# Standardize event types
event_mapping = {
'page_view': 'pageview',
'page-view': 'pageview',
'product_view': 'product_view',
'add_to_cart': 'add_to_cart',
'purchase': 'purchase',
'sign_up': 'signup',
'login': 'login',
'logout': 'logout'
}
df['event_type'] = df['event_type'].str.lower().replace(event_mapping)
# Extract session information
df = df.sort_values(['user_id', 'event_time'])
# Create session IDs based on 30-minute inactivity gaps
df['time_diff'] = df.groupby('user_id')['event_time'].diff()
df['new_session'] = (df['time_diff'].isna() | (df['time_diff'] > pd.Timedelta(minutes=30)))
df['session_id'] = df.groupby('user_id')['new_session'].cumsum()
df['session_id'] = df['user_id'] + '_session_' + df['session_id'].astype(str)
# Add session sequence numbers
df['event_sequence'] = df.groupby('session_id').cumcount() + 1
# Extract time-based features
df['event_hour'] = df['event_time'].dt.hour
df['event_day_of_week'] = df['event_time'].dt.day_name()
# Clean up columns
columns_to_keep = [
'user_id', 'event_type', 'event_time', 'session_id',
'event_sequence', 'event_hour', 'event_day_of_week'
]
# Add any product_id if it exists in the events
if 'product_id' in df.columns:
columns_to_keep.append('product_id')
df = df[columns_to_keep]
self.logger.info(f"Transformed {len(df)} user event records into {df['session_id'].nunique()} sessions")
return df
def _transform_products(self, df: pd.DataFrame) -> pd.DataFrame:
"""Clean and enrich product catalog data"""
if df.empty:
return df
df = df.copy()
# Standardize category names
df['category'] = df['category'].str.lower().str.strip()
# Create price tiers
df['price_tier'] = pd.cut(
df['price'],
bins=[0, 20, 50, 100, 500, float('inf')],
labels=['budget', 'economy', 'standard', 'premium', 'luxury']
)
# Extract features from product names (simplified NLP)
df['name_word_count'] = df['product_name'].str.split().str.len()
df['name_length'] = df['product_name'].str.len()
# Create product slugs for URLs
df['product_slug'] = (
df['product_name']
.str.lower()
.str.replace(r'[^a-z0-9\s]', '', regex=True)
.str.replace(r'\s+', '-', regex=True)
)
self.logger.info(f"Transformed {len(df)} product records")
return df
def _create_sales_analytics(self, transactions: pd.DataFrame, products: pd.DataFrame) -> pd.DataFrame:
"""Create enriched sales analytics dataset"""
if transactions.empty or products.empty:
return pd.DataFrame()
# Join transactions with product information
sales = transactions.merge(
products[['product_id', 'product_name', 'category', 'price', 'price_tier']],
on='product_id',
how='left'
)
# Calculate profit margins (simplified - assuming cost is 60% of price)
sales['estimated_cost'] = sales['price'] * 0.6
sales['estimated_profit'] = sales['amount_usd'] - sales['estimated_cost']
sales['profit_margin'] = sales['estimated_profit'] / sales['amount_usd']
# Add discount information
sales['discount_amount'] = sales['price'] - sales['amount_usd']
sales['discount_percentage'] = (sales['discount_amount'] / sales['price']) * 100
sales['is_discounted'] = sales['discount_percentage'] > 0
# Create time-based aggregations
sales['revenue'] = sales['amount_usd']
self.logger.info(f"Created sales analytics with {len(sales)} enriched records")
return sales
def _create_user_behavior_summary(self, events: pd.DataFrame) -> pd.DataFrame:
"""Create user behavior summary for analytics"""
if events.empty:
return pd.DataFrame()
# Session-level metrics
session_metrics = events.groupby(['user_id', 'session_id']).agg({
'event_time': ['min', 'max', 'count'],
'event_sequence': 'max'
}).round(2)
session_metrics.columns = ['session_start', 'session_end', 'total_events', 'max_sequence']
session_metrics['session_duration_minutes'] = (
(session_metrics['session_end'] - session_metrics['session_start']).dt.total_seconds() / 60
)
session_metrics.reset_index(inplace=True)
# User-level aggregations
user_summary = session_metrics.groupby('user_id').agg({
'session_id': 'count',
'total_events': ['sum', 'mean'],
'session_duration_minutes': ['sum', 'mean']
}).round(2)
user_summary.columns = [
'total_sessions', 'total_events', 'avg_events_per_session',
'total_session_time', 'avg_session_duration'
]
# Add event type distributions
event_types = events.groupby(['user_id', 'event_type']).size().unstack(fill_value=0)
user_summary = user_summary.join(event_types, how='left')
user_summary = user_summary.fillna(0)
user_summary.reset_index(inplace=True)
self.logger.info(f"Created behavior summary for {len(user_summary)} users")
return user_summary
def _create_customer_360_view(self, transactions: pd.DataFrame, events: pd.DataFrame, products: pd.DataFrame) -> pd.DataFrame:
"""Create comprehensive customer 360-degree view"""
if transactions.empty:
return pd.DataFrame()
# Transaction aggregations per user
transaction_summary = transactions.groupby('user_id').agg({
'transaction_id': 'count',
'amount_usd': ['sum', 'mean', 'std'],
'transaction_date': ['min', 'max'],
'days_since_first_transaction': 'max',
'is_potentially_fraudulent': 'sum'
}).round(2)
transaction_summary.columns = [
'total_transactions', 'total_spent', 'avg_transaction_amount', 'transaction_amount_std',
'first_transaction_date', 'last_transaction_date', 'customer_lifetime_days', 'fraud_flags'
]
# Calculate customer lifetime value and frequency
transaction_summary['avg_days_between_transactions'] = (
transaction_summary['customer_lifetime_days'] / transaction_summary['total_transactions']
)
transaction_summary['clv_estimate'] = (
transaction_summary['avg_transaction_amount'] *
(365 / transaction_summary['avg_days_between_transactions'].clip(lower=1))
)
# Behavioral data (if available)
if not events.empty:
behavior_summary = events.groupby('user_id').agg({
'session_id': 'nunique',
'event_type': 'count'
})
behavior_summary.columns = ['total_sessions', 'total_events']
transaction_summary = transaction_summary.join(behavior_summary, how='left')
# Customer segmentation based on RFM analysis
transaction_summary['recency_days'] = (
datetime.now() - pd.to_datetime(transaction_summary['last_transaction_date'])
).dt.days
# Create RFM segments
transaction_summary['recency_score'] = pd.qcut(
transaction_summary['recency_days'].rank(method='first'),
5, labels=[5,4,3,2,1]
).astype(int)
transaction_summary['frequency_score'] = pd.qcut(
transaction_summary['total_transactions'].rank(method='first'),
5, labels=[1,2,3,4,5]
).astype(int)
transaction_summary['monetary_score'] = pd.qcut(
transaction_summary['total_spent'].rank(method='first'),
5, labels=[1,2,3,4,5]
).astype(int)
transaction_summary['rfm_segment'] = (
transaction_summary['recency_score'].astype(str) +
transaction_summary['frequency_score'].astype(str) +
transaction_summary['monetary_score'].astype(str)
)
# Add customer category labels
def categorize_customer(row):
r, f, m = row['recency_score'], row['frequency_score'], row['monetary_score']
if r >= 4 and f >= 4 and m >= 4:
return 'Champions'
elif r >= 3 and f >= 3 and m >= 3:
return 'Loyal Customers'
elif r >= 4 and f >= 2 and m >= 2:
return 'Potential Loyalists'
elif r >= 4 and f <= 2 and m <= 2:
return 'New Customers'
elif r <= 2 and f >= 3 and m >= 3:
return 'At Risk'
elif r <= 2 and f <= 2 and m >= 3:
return 'Cannot Lose Them'
else:
return 'Others'
transaction_summary['customer_category'] = transaction_summary.apply(categorize_customer, axis=1)
transaction_summary.reset_index(inplace=True)
self.logger.info(f"Created 360-degree view for {len(transaction_summary)} customers")
return transaction_summary
def _convert_to_usd(self, row) -> float:
"""Convert currency amounts to USD (simplified conversion)"""
# In production, you'd use real-time exchange rates
exchange_rates = {
'USD': 1.0,
'EUR': 1.1,
'GBP': 1.25,
'JPY': 0.0091,
'CAD': 0.78
}
currency = row.get('currency', 'USD')
amount = row.get('amount', 0)
return amount * exchange_rates.get(currency, 1.0)
def _flag_potential_fraud(self, df: pd.DataFrame) -> pd.Series:
"""Simple fraud detection flags"""
fraud_flags = pd.Series(False, index=df.index)
# Flag unusually large transactions (> 99th percentile)
if len(df) > 100: # Need sufficient data for percentiles
high_amount_threshold = df['amount_usd'].quantile(0.99)
fraud_flags |= (df['amount_usd'] > high_amount_threshold)
# Flag multiple transactions in short time windows
df_sorted = df.sort_values(['user_id', 'transaction_date'])
time_diff = df_sorted.groupby('user_id')['transaction_date'].diff()
rapid_transactions = time_diff < pd.Timedelta(minutes=1)
fraud_flags |= rapid_transactions
return fraud_flags
class DataTransformationStage(PipelineStage):
"""Pipeline stage for data transformation"""
def __init__(self, config: PipelineConfig):
super().__init__("transformation", config)
self.transformation_engine = DataTransformationEngine(config)
def _execute(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, pd.DataFrame]:
"""Execute data transformations"""
transformed_datasets = self.transformation_engine.transform_datasets(datasets)
# Calculate metrics
self.metrics['records_processed'] = sum(
len(df) for df in transformed_datasets.values()
)
return transformed_datasets
The transformation engine demonstrates production-grade data processing patterns. It handles currency conversion, session analysis, customer segmentation, and fraud detection. The modular design makes it easy to add new transformations as business requirements evolve.
Data storage in production pipelines requires careful consideration of performance, reliability, and accessibility. Our output stage supports multiple storage formats and destinations while maintaining data lineage and providing rollback capabilities.
class DataOutputStage(PipelineStage):
"""Handles multiple output destinations with transaction safety"""
def __init__(self, config: PipelineConfig):
super().__init__("output", config)
self.output_configs = self._load_output_configurations()
def _execute(self, datasets: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Execute data output to multiple destinations"""
output_results = {}
for dataset_name, df in datasets.items():
if df.empty:
self.logger.info(f"Skipping empty dataset: {dataset_name}")
continue
dataset_outputs = self.output_configs.get(dataset_name, [])
output_results[dataset_name] = {}
for output_config in dataset_outputs:
try:
result = self._write_dataset(df, output_config)
output_results[dataset_name][output_config['name']] = result
self.logger.info(f"Successfully wrote {dataset_name} to {output_config['name']}")
except Exception as e:
self.logger.error(f"Failed to write {dataset_name} to {output_config['name']}: {str(e)}")
self.metrics['errors'] += 1
# Continue with other outputs even if one fails
output_results[dataset_name][output_config['name']] = {
'success': False,
'error': str(e)
}
# Update pipeline state tracking
self._update_pipeline_state(datasets)
# Calculate metrics
self.metrics['records_processed'] = sum(len(df) for df in datasets.values())
return output_results
def _write_dataset(self, df: pd.DataFrame, output_config: Dict[str, Any]) -> Dict[str, Any]:
"""Write dataset to specified output destination"""
output_type = output_config['type']
if output_type == 'sqlite':
return self._write_to_sqlite(df, output_config)
elif output_type == 'csv':
return self._write_to_csv(df, output_config)
elif output_type == 'parquet':
return self._write_to_parquet(df, output_config)
elif output_type == 'json':
return self._write_to_json(df, output_config)
elif output_type == 'api':
return self._write_to_api(df, output_config)
else:
raise ValueError(f"Unsupported output type: {output_type}")
def _write_to_sqlite(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
"""Write to SQLite database with transaction safety"""
db_path = config['path']
table_name = config['table']
write_mode = config.get('mode', 'replace') # replace, append, or upsert
# Create backup of existing data if replacing
if write_mode == 'replace':
self._backup_sqlite_table(db_path, table_name)
try:
conn = sqlite3.connect(db_path)
if write_mode == 'upsert':
# For upsert, we need to handle conflicts
self._upsert_to_sqlite(df, conn, table_name, config.get('key_columns', []))
else:
# Standard pandas to_sql
df.to_sql(
table_name,
conn,
if_exists='replace' if write_mode == 'replace' else 'append',
index=False,
method='multi', # Faster bulk inserts
chunksize=self.config.batch_size
)
# Create indexes for performance
indexes = config.get('indexes', [])
for index_config in indexes:
self._create_index(conn, table_name, index_config)
conn.close()
return {
'success': True,
'records_written': len(df),
'destination': f"{db_path}:{table_name}"
}
except Exception as e:
# Restore backup if replace operation failed
if write_mode == 'replace':
self._restore_sqlite_backup(db_path, table_name)
raise e
def _write_to_parquet(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
"""Write to Parquet format with partitioning support"""
file_path = config['path']
partition_columns = config.get('partition_columns', [])
# Ensure directory exists
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
if partition_columns:
# Write with partitioning
df.to_parquet(
file_path,
partition_cols=partition_columns,
engine='pyarrow',
compression='snappy'
)
else:
# Single file output
df.to_parquet(
file_path,
engine='pyarrow',
compression='snappy',
index=False
)
# Calculate file size for monitoring
if Path(file_path).is_file():
file_size = Path(file_path).stat().st_size
else:
# For partitioned datasets, calculate total size
file_size = sum(f.stat().st_size for f in Path(file_path).rglob('*.parquet'))
return {
'success': True,
'records_written': len(df),
'file_size_bytes': file_size,
'destination': file_path
}
def _write_to_csv(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
"""Write to CSV with proper encoding and formatting"""
file_path = config['path']
# Add timestamp to filename if configured
if config.get('timestamped', False):
path_obj = Path(file_path)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
file_path = path_obj.parent / f"{path_obj.stem}_{timestamp}{path_obj.suffix}"
# Ensure directory exists
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(
file_path,
index=False,
encoding='utf-8',
date_format='%Y-%m-%d %H:%M:%S'
)
file_size = Path(file_path).stat().st_size
return {
'success': True,
'records_written': len(df),
'file_size_bytes': file_size,
'destination': str(file_path)
}
def _write_to_json(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
"""Write to JSON format with proper serialization"""
file_path = config['path']
# Ensure directory exists
Path(file_path).parent.mkdir(parents=True, exist_ok=True)
# Convert timestamps to ISO format for JSON serialization
df_json = df.copy()
for col in df_json.columns:
if df_json[col].dtype == 'datetime64[ns]':
df_json[col] = df_json[col].dt.strftime('%Y-%m-%dT%H:%M:%S')
# Write as JSON lines format for better streaming support
if config.get('format') == 'jsonl':
df_json.to_json(file_path, orient='records', lines=True, date_format='iso')
else:
df_json.to_json(file_path, orient='records', indent=2, date_format='iso')
file_size = Path(file_path).stat().st_size
return {
'success': True,
'records_written': len(df),
'file_size_bytes': file_size,
'destination': file_path
}
def _write_to_api(self, df: pd.DataFrame, config: Dict[str, Any]) -> Dict[str, Any]:
"""Write to API endpoint with batching and retry logic"""
endpoint_url = config['url']
batch_size = config.get('batch_size', 100)
headers = config.get('headers', {})
total_sent = 0
session = requests.Session()
session.headers.update(headers)
# Process in batches
for i in range(0, len(df), batch_size):
batch_df = df.iloc[i:i+batch_size]
# Convert batch to JSON
batch_data = batch_df.to_dict(orient='records')
# Send batch with retry logic
for attempt in range(self.config.max_retries):
try:
response = session.post(
endpoint_url,
json={'data': batch_data, 'batch_id': i // batch_size},
timeout=30
)
response.raise_for_status()
total_sent += len(batch_data)
break
except requests.RequestException as e:
if attempt == self.config.max_retries - 1:
raise e
time.sleep(self.config.retry_delay * (2 ** attempt))
return {
'success': True,
'records_written': total_sent,
'destination': endpoint_url
}
def _upsert_to_sqlite(self, df: pd.DataFrame, conn: sqlite3.Connection, table_name: str, key_columns: List[str]):
"""Perform upsert operation (insert or update) to SQLite"""
if not key_columns:
# If no key columns specified, just append
df.to_sql(table_name, conn, if_exists='append', index=False)
return
# Create temporary table
temp_table = f"{table_name}_temp"
df.to_sql(temp_table, conn, if_exists='replace', index=False)
# Get column names
columns = df.columns.tolist()
key_columns_str = ', '.join(key_columns)
# Build the upsert query
set_clauses = [f"{col} = excluded.{col}" for col in columns if col not in key_columns]
upsert_query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
SELECT {', '.join(columns)} FROM {temp_table}
ON CONFLICT({key_columns_str}) DO UPDATE SET
{', '.join(set_clauses)}
"""
conn.execute(upsert_query)
conn.execute(f"DROP TABLE {temp_table}")
conn.commit()
def _backup_sqlite_table(self, db_path: str, table_name: str):
"""Create backup of SQLite table before replacement"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name=?
""", (table_name,))
if cursor.fetchone():
backup_table = f"{table_name}_backup_{int(time.time())}"
cursor.execute(f"""
CREATE TABLE {backup_table} AS
SELECT * FROM {table_name}
""")
self.logger.info(f"Created backup table: {backup_table}")
conn.close()
except Exception as e:
self.logger.warning(f"Failed to create backup: {str(e)}")
def _restore_sqlite_backup(self, db_path: str, table_name: str):
"""Restore SQLite table from most recent backup"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Find most recent backup
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name LIKE ?
ORDER BY name DESC
LIMIT 1
""", (f"{table_name}_backup_%",))
backup_table = cursor.fetchone()
if backup_table:
backup_name = backup_table[0]
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
cursor.execute(f"ALTER TABLE {backup_name} RENAME TO {table_name}")
self.logger.info(f"Restored table {table_name} from {backup_name}")
conn.close()
except Exception as e:
self.logger.error(f"Failed to restore backup: {str(e)}")
def _create_index(self, conn: sqlite3.Connection, table_name: str, index_config: Dict[str, Any]):
"""Create database index for performance"""
try:
index_name = index_config['name']
columns = index_config['columns']
unique = index_config.get('unique', False)
unique_clause = "UNIQUE " if unique else ""
columns_str = ', '.join(columns)
create_index_sql = f"""
CREATE {unique_clause}INDEX IF NOT EXISTS {index_name}
ON {table_name} ({columns_str})
"""
conn.execute(create_index_sql)
except Exception as e:
self.logger.warning(f"Failed to create index: {str(e)}")
def _update_pipeline_state(self, datasets: Dict[str, pd.DataFrame]):
"""Update pipeline state for incremental processing"""
try:
conn = sqlite3.connect(self.config.target_db_path)
cursor = conn.cursor()
# Create state table if it doesn't exist
cursor.execute("""
CREATE TABLE IF NOT EXISTS pipeline_state (
source_name TEXT PRIMARY KEY,
last_processed_at TEXT,
records_processed INTEGER,
updated_at TEXT
)
""")
current_time = datetime.now().isoformat()
for dataset_name, df in datasets.items():
if df.empty:
continue
# Get the latest timestamp from the dataset
timestamp_columns = [col for col in df.columns if 'date' in col.lower() or 'time' in col.lower()]
if timestamp_columns:
latest_timestamp = df[timestamp_columns[0]].max()
if pd.isna(latest_timestamp):
latest_timestamp = current_time
else:
latest_timestamp = latest_timestamp.isoformat()
else:
latest_timestamp = current_time
cursor.execute("""
INSERT OR REPLACE INTO pipeline_state
(source_name, last_processed_at, records_processed, updated_at)
VALUES (?, ?, ?, ?)
""", (dataset_name, latest_timestamp, len(df), current_time))
conn.commit()
conn.close()
except Exception as e:
self.logger.error(f"Failed to update pipeline state: {str(e)}")
def _load_output_configurations(self) -> Dict[str, List[Dict[str, Any]]]:
"""Load output configurations for each dataset"""
return {
'transactions_clean': [
{
'name': 'analytics_db',
'type': 'sqlite',
'path': self.config.target_db_path,
'table': 'transactions',
'mode': 'replace',
'indexes': [
{'name': 'idx_user_id', 'columns': ['user_id']},
{'name': 'idx_transaction_date', 'columns': ['transaction_date']}
]
},
{
'name': 'parquet_archive',
'type
Learning Path: Data Pipeline Fundamentals