
Picture this: It's 2 AM, and your real-time fraud detection pipeline just crashed. Millions of credit card transactions are backing up in Kafka topics, your downstream ML models are starving for features, and the business is losing $50,000 per minute in undetected fraud. The root cause? A single malformed JSON record from a third-party API that brought down your entire streaming infrastructure.
This scenario plays out daily across organizations because most data engineers treat error handling as an afterthought. They build pipelines assuming perfect data, stable networks, and infinite resources. But production systems are chaotic, unpredictable environments where everything that can go wrong eventually will.
Building truly resilient data pipelines requires understanding failure modes at every layer—from network partitions and schema evolution to resource exhaustion and cascading failures. You need recovery strategies that handle both transient glitches and catastrophic outages, while maintaining data consistency and pipeline performance.
By the end of this lesson, you'll architect data pipelines that gracefully handle failures and recover automatically. You'll implement sophisticated error handling patterns that have kept mission-critical systems running at companies like Netflix, Uber, and Airbnb.
What you'll learn:
You should have solid experience with distributed systems concepts, streaming frameworks like Apache Kafka or Apache Pulsar, and at least one major data processing framework (Spark, Flink, or similar). Familiarity with Python, Scala, or Java is essential, and you should understand basic concepts like eventual consistency and CAP theorem.
Before diving into solutions, we need to understand what can go wrong. Data pipeline failures fall into several categories, each requiring different recovery strategies.
Transient failures are temporary issues that resolve themselves or can be fixed with retry logic. Network timeouts, temporary resource exhaustion, and brief service outages fall into this category. These failures follow patterns—they often correlate with peak usage times, network congestion, or garbage collection cycles.
Persistent failures indicate deeper problems that won't resolve with simple retries. Schema mismatches, malformed data, authentication failures, and infrastructure outages require human intervention or sophisticated automated recovery.
Consider this Spark streaming job that processes e-commerce events:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import logging
class ECommerceProcessor:
def __init__(self):
self.spark = SparkSession.builder \
.appName("ECommerceEventProcessor") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoints") \
.getOrCreate()
# Define expected schema
self.event_schema = StructType([
StructField("user_id", StringType(), True),
StructField("product_id", StringType(), True),
StructField("action", StringType(), True),
StructField("price", DoubleType(), True),
StructField("timestamp", StringType(), True)
])
def process_events(self):
# Read from Kafka
raw_stream = self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "ecommerce-events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON events
parsed_stream = raw_stream.select(
from_json(col("value").cast("string"), self.event_schema).alias("event")
).select("event.*")
# This will fail on malformed JSON or schema mismatches
query = parsed_stream.writeStream \
.outputMode("append") \
.format("delta") \
.option("path", "/data/processed_events") \
.trigger(processingTime="10 seconds") \
.start()
return query
This code looks clean but has several failure modes:
Each requires different handling strategies.
Data quality failures stem from the data itself—corrupt records, schema violations, unexpected null values, or business rule violations. These often indicate problems in upstream systems or data generation processes.
Infrastructure failures involve the systems processing the data—memory exhaustion, disk full, network partitions, or service dependencies becoming unavailable. These typically affect entire batches or time windows, not individual records.
Here's a more sophisticated version that separates these concerns:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, when, isnan, isnull
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import logging
from typing import Optional, Tuple
class ResilientECommerceProcessor:
def __init__(self):
self.spark = SparkSession.builder \
.appName("ResilientECommerceProcessor") \
.config("spark.sql.streaming.checkpointLocation", "/checkpoints") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
self.event_schema = StructType([
StructField("user_id", StringType(), True),
StructField("product_id", StringType(), True),
StructField("action", StringType(), True),
StructField("price", DoubleType(), True),
StructField("timestamp", StringType(), True)
])
# Configure logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def classify_record_errors(self, df):
"""
Classify different types of data quality issues
"""
# Add error classification columns
df_with_errors = df.withColumn(
"has_schema_error",
col("event").isNull()
).withColumn(
"has_missing_required",
col("event.user_id").isNull() | col("event.product_id").isNull()
).withColumn(
"has_invalid_price",
col("event.price").isNull() | isnan(col("event.price")) | (col("event.price") < 0)
).withColumn(
"has_unknown_action",
~col("event.action").isin(["view", "click", "purchase", "add_to_cart"])
)
return df_with_errors
def separate_good_and_bad_records(self, df_with_errors):
"""
Separate clean records from problematic ones
"""
# Clean records have no errors
clean_records = df_with_errors.filter(
~col("has_schema_error") &
~col("has_missing_required") &
~col("has_invalid_price") &
~col("has_unknown_action")
).select("event.*")
# Bad records for further analysis
bad_records = df_with_errors.filter(
col("has_schema_error") |
col("has_missing_required") |
col("has_invalid_price") |
col("has_unknown_action")
)
return clean_records, bad_records
This approach isolates data quality issues from infrastructure problems, allowing targeted recovery strategies for each.
One of the most dangerous failure modes is cascading failure—when problems in one component trigger failures throughout the system. A slow database query can cause connection pool exhaustion, leading to application timeouts, which trigger retries, overwhelming the database further.
Circuit breakers prevent cascading failures by monitoring error rates and response times. When thresholds are exceeded, the circuit "opens," immediately failing requests instead of attempting them. This gives downstream systems time to recover.
import time
import threading
from enum import Enum
from typing import Callable, Any
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "CLOSED" # Normal operation
OPEN = "OPEN" # Failing fast
HALF_OPEN = "HALF_OPEN" # Testing recovery
class CircuitBreaker:
def __init__(self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: Exception = Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self._lock = threading.Lock()
def call(self, func: Callable, *args, **kwargs) -> Any:
with self._lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
return (self.last_failure_time and
datetime.now() - self.last_failure_time >= timedelta(seconds=self.recovery_timeout))
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Usage in data pipeline
class DatabaseWriter:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30,
expected_exception=ConnectionError
)
def write_batch(self, data):
"""
Write data with circuit breaker protection
"""
def _write_operation():
# Simulate database write
import random
if random.random() < 0.3: # 30% failure rate
raise ConnectionError("Database connection failed")
# Simulate processing time
time.sleep(0.1)
print(f"Successfully wrote {len(data)} records")
return True
try:
return self.circuit_breaker.call(_write_operation)
except Exception as e:
print(f"Circuit breaker prevented call: {e}")
# Implement fallback strategy here
return self._fallback_write(data)
def _fallback_write(self, data):
"""
Fallback strategy when circuit breaker is open
"""
print(f"Writing {len(data)} records to fallback storage")
# Could write to local file, different database, or queue for later
return False
Circuit breakers are essential for preventing cascading failures in complex data pipelines with multiple external dependencies.
Simple retry logic can solve many transient failure scenarios, but naive implementations often make problems worse. Effective retry strategies require understanding failure patterns and implementing sophisticated backoff algorithms.
Fixed interval retries attempt the same operation at regular intervals. This works for isolated failures but can create "thundering herd" problems when many clients retry simultaneously.
Exponential backoff increases the delay between retries, reducing load on struggling systems. Adding jitter prevents synchronized retries from multiple clients.
import random
import time
import logging
from typing import Callable, Any, Optional, List
from dataclasses import dataclass
from functools import wraps
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter_range: float = 0.1
retryable_exceptions: List[Exception] = None
def __post_init__(self):
if self.retryable_exceptions is None:
self.retryable_exceptions = [ConnectionError, TimeoutError]
class RetryStrategy:
def __init__(self, config: RetryConfig):
self.config = config
self.logger = logging.getLogger(__name__)
def calculate_delay(self, attempt: int) -> float:
"""
Calculate delay with exponential backoff and jitter
"""
if attempt <= 0:
return 0
# Exponential backoff
exponential_delay = self.config.base_delay * (self.config.exponential_base ** (attempt - 1))
# Cap at maximum delay
capped_delay = min(exponential_delay, self.config.max_delay)
# Add jitter to prevent thundering herd
jitter = capped_delay * self.config.jitter_range * (random.random() * 2 - 1)
final_delay = capped_delay + jitter
return max(0, final_delay)
def should_retry(self, exception: Exception, attempt: int) -> bool:
"""
Determine if we should retry based on exception type and attempt count
"""
if attempt >= self.config.max_attempts:
return False
return any(isinstance(exception, exc_type) for exc_type in self.config.retryable_exceptions)
def retry(self, func: Callable, *args, **kwargs) -> Any:
"""
Execute function with retry logic
"""
last_exception = None
for attempt in range(1, self.config.max_attempts + 1):
try:
result = func(*args, **kwargs)
if attempt > 1:
self.logger.info(f"Function succeeded on attempt {attempt}")
return result
except Exception as e:
last_exception = e
if not self.should_retry(e, attempt):
self.logger.error(f"Not retrying after attempt {attempt}: {str(e)}")
raise e
if attempt < self.config.max_attempts:
delay = self.calculate_delay(attempt)
self.logger.warning(
f"Attempt {attempt} failed: {str(e)}. Retrying in {delay:.2f} seconds"
)
time.sleep(delay)
# All attempts exhausted
self.logger.error(f"All {self.config.max_attempts} attempts failed")
raise last_exception
def with_retry(config: RetryConfig):
"""
Decorator for adding retry logic to functions
"""
def decorator(func):
retry_strategy = RetryStrategy(config)
@wraps(func)
def wrapper(*args, **kwargs):
return retry_strategy.retry(func, *args, **kwargs)
return wrapper
return decorator
# Example usage in data pipeline
class DataAPIClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.retry_config = RetryConfig(
max_attempts=5,
base_delay=2.0,
max_delay=120.0,
exponential_base=2.0,
jitter_range=0.2,
retryable_exceptions=[ConnectionError, TimeoutError, requests.exceptions.RequestException]
)
@with_retry(RetryConfig(max_attempts=3, base_delay=1.0))
def fetch_user_data(self, user_id: str):
"""
Fetch user data with retry logic
"""
import requests
response = requests.get(f"{self.base_url}/users/{user_id}", timeout=10)
if response.status_code == 429: # Rate limited
raise requests.exceptions.RequestException("Rate limited")
elif response.status_code >= 500: # Server error
raise ConnectionError(f"Server error: {response.status_code}")
elif response.status_code == 404: # Not found - don't retry
raise ValueError(f"User {user_id} not found")
return response.json()
def process_user_batch(self, user_ids: List[str]):
"""
Process a batch of users with individual retry logic
"""
results = []
failed_users = []
for user_id in user_ids:
try:
user_data = self.fetch_user_data(user_id)
results.append(user_data)
except ValueError:
# Don't retry 404s, but log them
self.logger.info(f"User {user_id} not found, skipping")
failed_users.append(user_id)
except Exception as e:
# Other exceptions already went through retry logic
self.logger.error(f"Failed to fetch user {user_id} after retries: {e}")
failed_users.append(user_id)
return results, failed_users
Streaming systems require more sophisticated retry logic because failed records can block processing of subsequent records. You need strategies that handle failures without stopping the entire stream.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, unix_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import json
class StreamingRetryHandler:
def __init__(self, spark: SparkSession):
self.spark = spark
# Schema for retry metadata
self.retry_schema = StructType([
StructField("original_data", StringType(), False),
StructField("error_message", StringType(), False),
StructField("attempt_count", IntegerType(), False),
StructField("first_failure_time", LongType(), False),
StructField("last_attempt_time", LongType(), False),
StructField("next_retry_time", LongType(), False)
])
def create_retry_record(self, original_data: str, error_message: str, attempt: int = 1):
"""
Create a retry record with backoff calculation
"""
current_time = int(time.time())
# Calculate next retry time with exponential backoff
base_delay = 60 # 1 minute base
max_delay = 3600 # 1 hour max
exponential_delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
# Add jitter
jitter = exponential_delay * 0.1 * (random.random() * 2 - 1)
final_delay = int(exponential_delay + jitter)
next_retry_time = current_time + final_delay
return {
"original_data": original_data,
"error_message": error_message,
"attempt_count": attempt,
"first_failure_time": current_time if attempt == 1 else None,
"last_attempt_time": current_time,
"next_retry_time": next_retry_time
}
def process_with_retry_queue(self, input_stream):
"""
Process stream with retry queue pattern
"""
# Process main stream
processed_stream = input_stream.map(self._process_record_with_error_handling)
# Separate successful and failed records
successful_records = processed_stream.filter(col("status") == "success")
failed_records = processed_stream.filter(col("status") == "failed")
# Write successful records to main output
successful_query = successful_records.select("data.*") \
.writeStream \
.outputMode("append") \
.format("delta") \
.option("path", "/data/processed") \
.option("checkpointLocation", "/checkpoints/main") \
.trigger(processingTime="30 seconds") \
.start()
# Write failed records to retry queue
retry_query = failed_records.select(
col("original_data"),
col("error_message"),
lit(1).alias("attempt_count"),
unix_timestamp().alias("first_failure_time"),
unix_timestamp().alias("last_attempt_time"),
(unix_timestamp() + lit(60)).alias("next_retry_time") # 1 minute delay
).writeStream \
.outputMode("append") \
.format("delta") \
.option("path", "/data/retry_queue") \
.option("checkpointLocation", "/checkpoints/retry") \
.trigger(processingTime="30 seconds") \
.start()
return successful_query, retry_query
def process_retry_queue(self):
"""
Separate job to process retry queue
"""
current_time = int(time.time())
# Read records ready for retry
retry_records = self.spark.read.format("delta") \
.load("/data/retry_queue") \
.filter(col("next_retry_time") <= current_time) \
.filter(col("attempt_count") < 5) # Max 5 attempts
# Process retry records
results = retry_records.map(self._retry_record_processing)
# Handle results (update retry queue, move to success/DLQ)
return results
Warning: In production streaming systems, be careful with retry logic that could create backpressure. Always implement circuit breakers and dead letter queues to prevent infinite retry loops.
Dead letter queues (DLQs) are the final destination for messages that cannot be processed successfully. They serve as a crucial safety valve, preventing poison messages from blocking healthy processing while preserving failed data for analysis and potential reprocessing.
A well-designed DLQ system doesn't just dump failed messages—it provides context for debugging, enables selective reprocessing, and maintains audit trails. Consider this comprehensive DLQ implementation:
from dataclasses import dataclass, asdict
from typing import Any, Dict, Optional, List
from datetime import datetime, timezone
import json
import hashlib
import logging
@dataclass
class DeadLetterRecord:
"""
Comprehensive dead letter record with debugging context
"""
original_message: str
error_type: str
error_message: str
error_stacktrace: str
processing_attempts: int
first_failure_time: datetime
last_failure_time: datetime
source_topic: str
source_partition: int
source_offset: int
processing_stage: str
message_checksum: str
metadata: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
result = asdict(self)
# Convert datetime to ISO format
result['first_failure_time'] = self.first_failure_time.isoformat()
result['last_failure_time'] = self.last_failure_time.isoformat()
return result
@classmethod
def from_exception(cls,
original_message: str,
exception: Exception,
context: Dict[str, Any]) -> 'DeadLetterRecord':
"""
Create DLQ record from processing exception
"""
import traceback
now = datetime.now(timezone.utc)
return cls(
original_message=original_message,
error_type=type(exception).__name__,
error_message=str(exception),
error_stacktrace=traceback.format_exc(),
processing_attempts=context.get('attempt_count', 1),
first_failure_time=context.get('first_failure_time', now),
last_failure_time=now,
source_topic=context.get('source_topic', 'unknown'),
source_partition=context.get('source_partition', -1),
source_offset=context.get('source_offset', -1),
processing_stage=context.get('processing_stage', 'unknown'),
message_checksum=hashlib.md5(original_message.encode()).hexdigest(),
metadata=context.get('metadata', {})
)
class DeadLetterQueueManager:
def __init__(self, storage_backend: str, config: Dict[str, Any]):
self.storage_backend = storage_backend
self.config = config
self.logger = logging.getLogger(__name__)
def send_to_dlq(self, dlq_record: DeadLetterRecord, queue_name: str = "default"):
"""
Send record to appropriate dead letter queue
"""
try:
if self.storage_backend == "kafka":
self._send_to_kafka_dlq(dlq_record, queue_name)
elif self.storage_backend == "s3":
self._send_to_s3_dlq(dlq_record, queue_name)
elif self.storage_backend == "database":
self._send_to_database_dlq(dlq_record, queue_name)
else:
raise ValueError(f"Unsupported storage backend: {self.storage_backend}")
self.logger.info(f"Sent message to DLQ {queue_name}: {dlq_record.message_checksum}")
except Exception as e:
self.logger.error(f"Failed to send message to DLQ: {e}")
# Fallback to local file system
self._send_to_local_dlq(dlq_record, queue_name)
def _send_to_kafka_dlq(self, dlq_record: DeadLetterRecord, queue_name: str):
"""
Send to Kafka-based DLQ
"""
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=self.config['kafka_brokers'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
dlq_topic = f"dlq-{queue_name}"
# Use message checksum as key for deduplication
producer.send(
dlq_topic,
key=dlq_record.message_checksum.encode(),
value=dlq_record.to_dict()
)
producer.flush()
def _send_to_s3_dlq(self, dlq_record: DeadLetterRecord, queue_name: str):
"""
Send to S3-based DLQ with partitioning
"""
import boto3
s3_client = boto3.client('s3')
bucket = self.config['s3_bucket']
# Partition by date and error type for easier analysis
partition_key = f"dlq/{queue_name}/date={dlq_record.last_failure_time.date()}/error_type={dlq_record.error_type}"
object_key = f"{partition_key}/{dlq_record.message_checksum}.json"
s3_client.put_object(
Bucket=bucket,
Key=object_key,
Body=json.dumps(dlq_record.to_dict(), indent=2),
ContentType='application/json'
)
def analyze_dlq_patterns(self, queue_name: str, time_window_hours: int = 24) -> Dict[str, Any]:
"""
Analyze DLQ patterns for insights
"""
# This would vary by storage backend
if self.storage_backend == "database":
return self._analyze_database_dlq(queue_name, time_window_hours)
elif self.storage_backend == "s3":
return self._analyze_s3_dlq(queue_name, time_window_hours)
else:
return {"error": "Analysis not supported for this backend"}
def reprocess_dlq_messages(self,
queue_name: str,
error_types: List[str] = None,
max_messages: int = 1000) -> Dict[str, int]:
"""
Selectively reprocess messages from DLQ
"""
reprocessed = 0
failed = 0
messages = self._fetch_dlq_messages(queue_name, error_types, max_messages)
for message in messages:
try:
# Attempt reprocessing
success = self._reprocess_message(message)
if success:
reprocessed += 1
self._remove_from_dlq(message)
else:
failed += 1
except Exception as e:
self.logger.error(f"Reprocessing failed: {e}")
failed += 1
return {"reprocessed": reprocessed, "failed": failed}
Some messages are inherently unprocessable—they have structural issues, violate business rules, or expose bugs in processing logic. These "poison messages" need special handling to prevent them from cycling through retry logic indefinitely.
import re
from typing import Set, Pattern, Callable
from datetime import datetime, timedelta
class PoisonMessageDetector:
def __init__(self):
self.patterns: List[Pattern] = []
self.validators: List[Callable] = []
self.quarantine_reasons: Set[str] = set()
def add_poison_pattern(self, pattern: str, reason: str):
"""
Add regex pattern that identifies poison messages
"""
compiled_pattern = re.compile(pattern)
self.patterns.append((compiled_pattern, reason))
self.quarantine_reasons.add(reason)
def add_validator(self, validator: Callable[[str], tuple[bool, str]]):
"""
Add custom validator function
Returns (is_valid, error_reason)
"""
self.validators.append(validator)
def is_poison_message(self, message: str, metadata: Dict[str, Any]) -> tuple[bool, str]:
"""
Determine if a message should be quarantined
"""
# Check attempt count - too many retries indicates poison
attempt_count = metadata.get('attempt_count', 0)
if attempt_count > 10:
return True, "excessive_retries"
# Check for rapid repeated failures
first_failure = metadata.get('first_failure_time')
last_failure = metadata.get('last_failure_time')
if first_failure and last_failure:
time_diff = last_failure - first_failure
if time_diff < timedelta(minutes=5) and attempt_count > 3:
return True, "rapid_repeated_failure"
# Check poison patterns
for pattern, reason in self.patterns:
if pattern.search(message):
return True, reason
# Check custom validators
for validator in self.validators:
is_valid, error_reason = validator(message)
if not is_valid:
return True, error_reason
return False, ""
class SmartRetryProcessor:
def __init__(self, dlq_manager: DeadLetterQueueManager):
self.dlq_manager = dlq_manager
self.poison_detector = PoisonMessageDetector()
self.logger = logging.getLogger(__name__)
# Configure poison message detection
self._setup_poison_detection()
def _setup_poison_detection(self):
"""
Configure patterns and validators for poison message detection
"""
# Common poison patterns
self.poison_detector.add_poison_pattern(
r'.*\x00.*', # Null bytes often cause parsing issues
"null_bytes"
)
self.poison_detector.add_poison_pattern(
r'^$|^\s*$', # Empty or whitespace-only messages
"empty_message"
)
# Custom validator for JSON structure
def json_validator(message: str) -> tuple[bool, str]:
try:
parsed = json.loads(message)
# Check for required fields
required_fields = ['user_id', 'timestamp', 'event_type']
for field in required_fields:
if field not in parsed:
return False, f"missing_required_field_{field}"
return True, ""
except json.JSONDecodeError as e:
return False, f"invalid_json_{str(e)[:50]}"
self.poison_detector.add_validator(json_validator)
def process_with_smart_retry(self, message: str, context: Dict[str, Any]):
"""
Process message with intelligent retry/quarantine logic
"""
try:
# Attempt processing
result = self._process_message(message)
return result
except Exception as e:
# Check if this should be quarantined
is_poison, reason = self.poison_detector.is_poison_message(message, context)
if is_poison:
self.logger.warning(f"Quarantining poison message: {reason}")
# Create DLQ record with poison flag
context['poison_reason'] = reason
context['quarantined'] = True
dlq_record = DeadLetterRecord.from_exception(message, e, context)
self.dlq_manager.send_to_dlq(dlq_record, f"quarantine-{reason}")
return {"status": "quarantined", "reason": reason}
else:
# Regular retry logic
attempt_count = context.get('attempt_count', 0) + 1
context['attempt_count'] = attempt_count
if attempt_count < 5:
self.logger.info(f"Scheduling retry {attempt_count} for message")
return {"status": "retry", "attempt": attempt_count}
else:
# Max retries exceeded - send to DLQ
dlq_record = DeadLetterRecord.from_exception(message, e, context)
self.dlq_manager.send_to_dlq(dlq_record, "max_retries_exceeded")
return {"status": "failed", "reason": "max_retries_exceeded"}
Dead letter queues and poison message handling are essential for maintaining data pipeline stability. They prevent individual bad messages from bringing down entire systems while preserving data for later analysis and recovery.
Idempotency—the ability to apply an operation multiple times without changing the result—is crucial for reliable data pipelines. Network failures, process crashes, and system restarts can cause duplicate processing, leading to incorrect results if operations aren't designed to be idempotent.
Consider a simple aggregation pipeline that counts user actions. Without idempotency, reprocessing the same data will double-count events:
# Non-idempotent processing - PROBLEMATIC
def process_user_events_naive(events):
user_counts = {}
for event in events:
user_id = event['user_id']
user_counts[user_id] = user_counts.get(user_id, 0) + 1
# This will add to existing counts, not replace them
update_user_totals(user_counts)
The challenge becomes more complex with streaming data, where you might reprocess overlapping time windows, or with batch jobs where individual records might be processed multiple times due to partial failures.
True exactly-once processing requires coordination between data ingestion, processing, and output systems. Here's a comprehensive pattern using transactional semantics:
import uuid
import hashlib
from contextlib import contextmanager
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
import json
import time
@dataclass
class ProcessingCheckpoint:
"""
Represents a processing checkpoint for idempotency tracking
"""
job_id: str
batch_id: str
processing_timestamp: int
input_checksum: str
records_processed: int
output_locations: List[str]
status: str # 'started', 'completed', 'failed'
class IdempotencyManager:
def __init__(self, storage_backend):
self.storage_backend = storage_backend
def create_batch_id(self, input_data: List[Dict]) -> str:
"""
Create deterministic batch ID based on input data
"""
# Sort data to ensure consistent ordering
sorted_data = sorted(input_data, key=lambda x: json.dumps(x, sort_keys=True))
combined_data = json.dumps(sorted_data, sort_keys=True)
# Create hash of input data
content_hash = hashlib.sha256(combined_data.encode()).hexdigest()
return f"batch_{content_hash[:16]}"
def start_processing(self, job_id: str, input_data: List[Dict]) -> ProcessingCheckpoint:
"""
Start processing with idempotency tracking
"""
batch_id = self.create_batch_id(input_data)
# Check if this batch was already processed
existing_checkpoint = self.get_checkpoint(job_id, batch_id)
if existing_checkpoint and existing_checkpoint.status == 'completed':
raise AlreadyProcessedException(f"Batch {batch_id} already processed successfully")
# Create new checkpoint
checkpoint = ProcessingCheckpoint(
job_id=job_id,
batch_id=batch_id,
processing_timestamp=int(time.time()),
input_checksum=hashlib.sha256(json.dumps(input_data, sort_keys=True).encode()).hexdigest(),
records_processed=0,
output_locations=[],
status='started'
)
self.save_checkpoint(checkpoint)
return checkpoint
def complete_processing(self, checkpoint: ProcessingCheckpoint,
output_locations: List[str]) -> None:
"""
Mark processing as completed
"""
checkpoint.output_locations = output_locations
checkpoint.status = 'completed'
checkpoint.processing_timestamp = int(time.time())
self.save_checkpoint(checkpoint)
@contextmanager
def idempotent_processing(self, job_id: str, input_data: List[Dict]):
"""
Context manager for idempotent processing
"""
try:
checkpoint = self.start_processing(job_id, input_data)
yield checkpoint
except AlreadyProcessedException:
# Already processed successfully, nothing to do
return
except Exception as e:
# Mark as failed
checkpoint.status = 'failed'
self.save_checkpoint(checkpoint)
raise e
class IdempotentProcessor:
def __init__(self, idempotency_manager: IdempotencyManager):
self.idempotency_manager = idempotency_manager
def process_user_events_idempotent(self, job_id: str, events: List[Dict]) -> Dict[str, Any]:
"""
Process user events with guaranteed idempotency
"""
with self.idempotency_manager.idempotent_processing(job_id, events) as checkpoint:
# Process events
user_aggregates = self._aggregate_events(events)
# Generate deterministic output paths
output_path = f"/data/user_aggregates/{checkpoint.batch_id}.parquet"
# Write with atomic operation
self._write_aggregates_atomic(user_aggregates, output_path)
# Update checkpoint
self.idempotency_manager.complete_processing(
checkpoint,
[output_path]
)
return {
"batch_id": checkpoint.batch_id,
"records_processed": len(events),
"output_path": output_path,
"status": "completed"
}
def _aggregate_events(self, events: List[Dict]) -> Dict[str, Dict]:
"""
Aggregate events by user - this is naturally idempotent
"""
user_aggregates = {}
for event in events:
user_id = event['user_id']
event_type = event.get('event_type', 'unknown')
if user_id not in user_aggregates:
user_aggregates[user_id] = {
'user_id': user_id,
'total_events': 0,
'event_types': {},
'first_seen': event.get('timestamp'),
'last_seen': event.get('timestamp')
}
agg = user_aggregates[user_id]
agg['total_events'] += 1
agg['event_types'][event_type] = agg['event_types'].get(event_type, 0) + 1
agg['last_seen'] = max(agg['last_seen'], event.get('timestamp', 0))
return user_aggregates
def _write_aggregates_atomic(self, aggregates: Dict, output_path: str):
"""
Write aggregates atomically to prevent partial writes
"""
import tempfile
import shutil
# Write to temporary location first
temp_path = f"{output_path}.tmp.{uuid.uuid4()}"
try:
# Write data (simplified - would use proper format like Parquet)
with open(temp_path, 'w') as f:
json.dump(aggregates, f, indent=2)
# Atomic move to final location
shutil.move(temp_path, output_path)
except Exception:
# Cleanup temporary file if write failed
if os.path.exists(temp_path):
os.remove(temp_path)
raise
Streaming systems require more sophisticated idempotency patterns because data arrives continuously and out-of-order. Watermarks help define processing boundaries for idempotent operations:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum as spark_sum, max as spark_max
from pyspark.sql.streaming import StreamingQuery
class IdempotentStreamProcessor:
def __init__(self, spark: SparkSession):
self.spark = spark
def create_idempotent_aggregation(self, input_stream):
"""
Create idempotent streaming aggregation using watermarks
"""
# Add watermark for late data handling
watermarked_stream = input_stream \
.withWatermark("event_timestamp", "10 minutes")
# Windowed aggregation - naturally idempotent within windows
windowed_aggregates = watermarked_stream \
.groupBy(
window(col("event_timestamp"), "5 minutes"),
col("user_id")
) \
.agg(
spark_sum("value").alias("total_value"),
spark_max("event_timestamp").alias("latest_event")
) \
.select(
col("window.start").alias("window_start"),
col("window.end").alias("window_end"),
col("user_id"),
col("total_value"),
col("latest_event")
)
# Write with deduplication key
return windowed_aggregates.writeStream \
.outputMode("update") \
.format("delta") \
.option("path", "/data/windowed_aggregates") \
.option("checkpointLocation", "/checkpoints/windowed") \
.option("mergeSchema", "true") \
.trigger(processingTime="1 minute") \
.start()
def create_exactly_once_sink(self, processed_stream):
"""
Create exactly-once sink using Delta Lake merge operations
"""
def batch_function(batch_df, batch_id):
"""
Process each micro-batch with exactly-once semantics
"""
# Create temporary view for merge operation
batch_df.createOrReplaceTempView("updates")
# Perform upsert using merge
merge_sql = """
MERGE INTO user_aggregates AS target
USING updates AS source
ON target.user_id = source.user_id
AND target.window_start = source.window_start
WHEN MATCHED THEN
UPDATE SET
total_value = source.total_value,
latest_event = source.latest_event
WHEN NOT MATCHED THEN
INSERT (user_id, window_start, window_end, total_value, latest_event)
VALUES (source.user_id, source.window_start, source.window_end,
source.total_value, source.latest_event)
"""
# Execute merge (idempotent operation)
self.spark.sql(merge_sql)
# Use foreachBatch for exactly-once processing
return processed_stream.writeStream \
.foreachBatch(batch_function) \
.option("checkpointLocation", "/checkpoints/exactly_once") \
.trigger(processingTime="30 seconds") \
.start()
Idempotency is fundamental to reliable data processing. By designing operations to be safely repeatable, you eliminate entire classes of data consistency issues that plague production pipelines.
Effective monitoring transforms reactive fire-fighting into proactive pipeline management. The key is building observability systems that provide early warning of problems and actionable insights for resolution.
Pipeline monitoring requires tracking metrics at multiple levels: infrastructure (CPU, memory, network), application (throughput, latency, errors), and business logic (data quality, SLA compliance).
import time
import logging
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from threading import Thread, Event
import json
from collections import defaultdict, deque
import statistics
@dataclass
class MetricPoint:
"""
Individual metric measurement
"""
name: str
value: float
timestamp: datetime
tags: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
'name': self.name,
'value': self.value,
'timestamp': self.timestamp.isoformat(),
'tags': self.tags
}
class PipelineMetricsCollector:
def __init__(self, pipeline_name: str, metrics_backend: str = "prometheus"):
self.pipeline_name = pipeline_name
self.metrics_backend = metrics_backend
self.metrics_buffer = deque(maxlen=10000)
self.counters = defaultdict(int)
self.gauges = defaultdict(float)
self.histograms = defaultdict(list)
self.timers = {}
# Start background thread for metric collection
self.collection_thread = Thread(target=self._collect_system_metrics)
self.stop_event = Event()
self.collection_thread.start()
def record_counter(self, name: str, value: int = 1, tags: Dict[str, str] = None):
"""
Record counter metric (monotonically increasing)
"""
full_name = f"{self.pipeline_name}.{name}"
self.counters[full_name] += value
metric = MetricPoint(
name=full_name,
value=value,
timestamp=datetime.now(),
tags=tags or {}
)
self.metrics_buffer.append(metric)
self._send_to_backend(metric)
def record_gauge(self, name: str, value: float, tags: Dict[str, str] = None):
"""
Record gauge metric (point-in-time value)
"""
full_name = f"{self.pipeline_name}.{name}"
self.gauges[full_name] = value
metric = MetricPoint(
name=full_name,
value=value,
timestamp=datetime.now(),
tags=tags or {}
)
self.metrics_buffer.append(metric)
self._send_to_backend(metric)
def record_histogram(self, name: str, value: float, tags: Dict[str, str] = None):
"""
Record histogram metric for distribution analysis
"""
full_name = f"{self.pipeline_name}.{name}"
self.histograms[full_name].append(value)
# Keep only recent values
if len(self.histograms[full_name]) > 1000:
self.histograms[full_name] = self.histograms[full_name][-1000:]
metric = MetricPoint(
name=full_name,
value=value,
timestamp=datetime.now(),
tags=tags or {}
)
self.metrics_buffer.append(metric)
self._send_to_backend(metric)
def start_timer(self, name: str) -> str:
"""
Start timing an operation
"""
timer_id = f"{name}_{int(time.time() * 1000000)}"
self.timers[timer_id] = {
'name': name,
'start_time': time.time(),
'tags': {}
}
return timer_id
def end_timer(self, timer_id: str, tags: Dict[str, str] = None):
"""
End timing and record duration
"""
if timer_id not in self.timers:
return
timer_info = self.timers.pop(timer_id)
duration = time.time() - timer_info['start_time']
final_tags = timer_info['tags'].copy()
if tags:
final_tags.update(tags)
self.record_histogram(
f"{timer_info['name']}.duration",
duration * 1000, # Convert to milliseconds
final_tags
)
def get_histogram_stats(self, name: str) -> Dict[str, float]:
"""
Get statistical summary of histogram
"""
full_name = f"{self.pipeline_name}.{name}"
values = self.histograms.get(full_name, [])
if not values:
return {}
return {
'count': len(values),
'mean': statistics.mean(values),
'median': statistics.median(values),
'p95': self._percentile(values, 95),
'p99': self._percentile(values, 99),
'min': min(values),
'max': max(values)
}
def _percentile(self, values: List[float], percentile: float) -> float:
"""
Calculate percentile value
"""
sorted_values = sorted(values)
index = (percentile / 100) * (len(sorted_values) - 1)
if index.is_integer():
return sorted_values[int(index)]
else:
lower = sorted_values[int(index)]
upper = sorted_values[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
class PipelineHealthChecker:
def __init__(self, metrics_collector: PipelineMetricsCollector):
self.metrics = metrics_collector
self.health_checks = []
self.alert_rules = []
def add_health_check(self, name: str, check_func: Callable[[], bool],
interval_seconds: int = 60):
"""
Add periodic health check
"""
self.health_checks.append({
'name': name,
'func': check_func,
'interval': interval_seconds,
'last_run': 0,
'last_result': None
})
def add_alert_rule(self, rule: Dict[str, Any]):
"""
Add alerting rule
Example rule:
{
'name': 'High Error Rate',
'metric': 'error_rate',
'condition': 'greater_than',
'threshold': 0.05,
'duration': 300, # seconds
'severity': 'critical'
}
"""
self.alert_rules.append(rule)
def run_health_checks(self):
"""
Execute all health checks
"""
current_time = time.time()
for check in self.health_checks:
if current_time - check['last_run'] >= check['interval']:
try:
result = check['func']()
check['last_result'] = result
check['last_run'] = current_time
# Record health check result
self.metrics.record_gauge(
f"health_check.{check['name']}",
1.0 if result else 0.0
)
if not result:
self.metrics.record_counter(
f"health_check.{check['name']}.failed"
)
except Exception as e:
logging.error(f"Health check {check['name']} failed: {e}")
self.metrics.record_counter(
f"health_check.{check['name']}.error"
)
class DataQualityMonitor:
def __init__(self, metrics_collector: PipelineMetricsCollector):
self.metrics = metrics_collector
def check_batch_quality(self, batch_data: List[Dict], batch_id: str):
"""
Comprehensive data quality checks for a batch
"""
total_records = len(batch_data)
self.metrics.record_gauge("batch.total_records", total_records,
{"batch_id": batch_id})
# Check for null values
null_counts = defaultdict(int)
duplicate_keys = set()
seen_keys = set()
schema_violations = 0
value_range_violations = 0
for record in batch_data:
# Check for nulls
for key, value in record.items():
if value is None or value == "":
null_counts[key] += 1
# Check for duplicates (assuming 'id' field)
record_id = record.get('id')
if record_id:
if record_id in seen_keys:
duplicate_keys.add(record_id)
seen_keys.add(record_id)
# Schema validation
if not self._validate_schema(record):
schema_violations += 1
# Value range validation
if not self._validate_value_ranges(record):
value_range_violations += 1
# Record quality metrics
for field, null_count in null_counts.items():
null_rate = null_count / total_records
self.metrics.record_gauge(f"data_quality.null_rate.{field}",
null_rate, {"batch_id": batch_id})
duplicate_rate = len(duplicate_keys) / total_records if total_records > 0 else 0
self.metrics.record_gauge("data_quality.duplicate_rate", duplicate_rate,
{"batch_id": batch_id})
schema_violation_rate = schema_violations / total_records if total_records > 0 else 0
self.metrics.record_gauge("data_quality.schema_violation_rate",
schema_violation_rate, {"batch_id": batch_id})
# Overall quality score
quality_score = 1.0 - (duplicate_rate + schema_violation_rate)
self.metrics.record_gauge("data_quality.overall_score", quality_score,
{"batch_id": batch_id})
return {
"total_records": total_records,
"null_counts": dict(null_counts),
"duplicate_count": len(duplicate_keys),
"schema_violations": schema_violations,
"quality_score": quality_score
}
def _validate_schema(self, record: Dict) -> bool:
"""
Validate record against expected schema
"""
required_fields = ['id', 'timestamp', 'user_id']
return all(field in record for field in required_fields)
def _validate_value_ranges(self, record: Dict) -> bool:
"""
Validate value ranges and business rules
"""
# Example validations
if 'price' in record:
price = record['price']
if not isinstance(price, (int, float)) or price < 0:
return False
if 'email' in record:
email = record['email']
if '@' not in str(email):
return False
return True
# Example usage in a data pipeline
class MonitoredPipeline:
def __init__(self, pipeline_name: str):
self.metrics = PipelineMetricsCollector(pipeline_name)
self.health_checker = PipelineHealthChecker(self.metrics)
self.quality_monitor = DataQualityMonitor(self.metrics)
# Set up health checks
self.health_checker.add_health_check(
"kafka_connectivity",
self._check_kafka_connection,
30
)
self.health_checker.add_health_check(
"database_connectivity",
self._check_database_connection,
60
)
def process_batch(self, batch_data: List[Dict], batch_id: str):
"""
Process batch with comprehensive monitoring
"""
timer_id = self.metrics.start_timer("batch_processing")
try:
# Run health checks
self.health_checker.run_health_checks()
# Check data quality
quality_report = self.quality_monitor.check_batch_quality(batch_data, batch_id)
# Process data
processed_records = self._process_records(batch_data)
# Record success metrics
self.metrics.record_counter("batches.processed.success")
self.metrics.record_gauge("batch.processed_records", len(processed_records))
return processed_records
except Exception as e:
self.metrics.record_counter("batches.processed.error")
self.metrics.record_counter("errors.by_type", 1, {"error_type": type(e).__name__})
raise
finally:
self.metrics.end_timer(timer_id, {"batch_id": batch_id})
def _check_kafka_connection(self) -> bool:
"""
Check if Kafka is accessible
"""
try:
# Simplified check - would use actual Kafka client
return True
except:
return False
def _check_database_connection(self) -> bool:
"""
Check if database is accessible
"""
try:
# Simplified check - would use actual DB connection
return True
except:
return False
def _process_records(self, records: List[Dict]) -> List[Dict]:
"""
Actual record processing logic
"""
# Simulate processing
time.sleep(0.1)
return records
The difference between good and bad alerting is specificity. Generic alerts create noise; specific alerts enable action. Design alerting rules that provide context and suggest remediation steps:
from typing import List, Dict, Any, Optional
from enum import Enum
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class Alert:
def __init__(self, rule_name: str, severity: AlertSeverity,
message: str, context: Dict[str, Any] = None):
self.rule_name = rule_name
self.severity = severity
self.message = message
self.context = context or {}
self.timestamp = datetime.now()
self.alert_id = f"{rule_name}_{int(self.timestamp.timestamp())}"
class AlertManager:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.active_alerts = {}
self.alert_history = deque(maxlen=1000)
def evaluate_rules(self, metrics: Dict[str, Any]) -> List[Alert]:
"""
Evaluate all alert rules against current metrics
"""
alerts = []
# High error rate alert
error_rate = metrics.get('error_rate', 0)
if error_rate > 0.05: # 5% error rate
alert = Alert(
"high_error_rate",
AlertSeverity.CRITICAL,
f"Error rate is {error_rate:.2%}, exceeding 5% threshold",
{
"current_error_rate": error_rate,
"threshold": 0.05,
"suggested_action": "Check recent deployments and error logs",
"runbook": "https://wiki.company.com/runbooks/high-error-rate"
}
)
alerts.append(alert)
# Processing lag alert
processing_lag = metrics.get('processing_lag_seconds', 0)
if processing_lag > 300: # 5 minutes
severity = AlertSeverity.CRITICAL if processing_lag > 1800 else AlertSeverity.WARNING
alert = Alert(
"processing_lag",
severity,
f"Processing lag is {processing_lag} seconds",
{
"current_lag": processing_lag,
"threshold": 300,
"suggested_action": "Check for resource constraints or downstream bottlenecks",
"affected_consumers": metrics.get('lagging_consumers', [])
}
)
alerts.append(alert)
# Data quality degradation
quality_score = metrics.get('data_quality_score', 1.0)
if quality_score < 0.9: # Below 90% quality
alert = Alert(
"data_quality_degradation",
AlertSeverity.WARNING,
f"Data quality score dropped to {quality_score:.1%}",
{
"current_score": quality_score,
"threshold": 0.9,
"quality_issues": metrics.get('quality_issues', {}),
"suggested_action": "Review upstream data sources and validation rules"
}
)
alerts.append(alert)
return alerts
def send_alert(self, alert: Alert):
"""
Send alert through configured channels
"""
# Deduplicate alerts
if alert.rule_name in self.active_alerts:
last_alert_time = self.active_alerts[alert.rule_name].timestamp
if datetime.now() - last_alert_time < timedelta(minutes=15):
return # Skip duplicate alert
self.active_alerts[alert.rule_name] = alert
self.alert_history.append(alert)
# Send through configured channels
if self.config.get('email_enabled'):
self._send_email_alert(alert
Learning Path: Data Pipeline Fundamentals