
You're building a data pipeline to ingest customer support tickets from Zendesk, user analytics from Mixpanel, and financial data from Stripe. Each API has different pagination schemes, rate limits, and authentication requirements. Your pipeline needs to handle thousands of requests per hour while respecting rate limits, gracefully handling failures, and ensuring data consistency across batch runs.
This scenario represents the reality of modern data engineering: APIs are everywhere, but they're all different. Understanding how to work with REST APIs at scale isn't just about making HTTP requests—it's about architecting resilient systems that can handle the complexities of real-world API integration.
What you'll learn:
You should have solid experience with Python and HTTP concepts, familiarity with data pipeline architectures, and understanding of basic REST principles. We'll work with advanced patterns that assume you've built API clients before.
Real-world APIs aren't just simple request-response patterns. They're complex systems with sophisticated rate limiting, pagination schemes, and failure modes that can break naive implementations.
Let's start by examining three common API patterns you'll encounter:
import httpx
import asyncio
import time
from dataclasses import dataclass
from typing import Optional, Dict, Any, AsyncIterator
from datetime import datetime, timedelta
import logging
@dataclass
class APIResponse:
"""Standardized response wrapper for all API interactions"""
data: Dict[str, Any]
status_code: int
headers: Dict[str, str]
rate_limit_remaining: Optional[int] = None
rate_limit_reset: Optional[datetime] = None
next_page_token: Optional[str] = None
has_more: bool = False
class APIClient:
"""Base client with common patterns for REST API interaction"""
def __init__(self, base_url: str, auth_header: Dict[str, str],
max_retries: int = 3, timeout: int = 30):
self.base_url = base_url.rstrip('/')
self.auth_header = auth_header
self.max_retries = max_retries
self.timeout = timeout
self.session = None
self._rate_limit_state = {}
async def __aenter__(self):
self.session = httpx.AsyncClient(
timeout=self.timeout,
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.aclose()
def _parse_rate_limit_headers(self, headers: Dict[str, str]) -> Dict[str, Any]:
"""Parse rate limit information from common header patterns"""
rate_info = {}
# GitHub/GitLab style
if 'x-ratelimit-remaining' in headers:
rate_info['remaining'] = int(headers['x-ratelimit-remaining'])
rate_info['reset'] = datetime.fromtimestamp(
int(headers.get('x-ratelimit-reset', 0))
)
# Twitter style
elif 'x-rate-limit-remaining' in headers:
rate_info['remaining'] = int(headers['x-rate-limit-remaining'])
rate_info['reset'] = datetime.fromtimestamp(
int(headers.get('x-rate-limit-reset', 0))
)
# Stripe style (per-second limits)
elif 'stripe-ratelimit-remaining' in headers:
rate_info['remaining'] = int(headers['stripe-ratelimit-remaining'])
rate_info['reset'] = datetime.now() + timedelta(seconds=1)
return rate_info
This base client establishes patterns we'll use throughout: connection pooling for performance, standardized response handling, and rate limit header parsing. Notice how different APIs use different header formats—this is the reality of API integration.
Pagination is where most API integrations break down under load. Let's examine the four main pagination patterns and their trade-offs:
The simplest but most problematic pattern:
class OffsetPaginator:
"""Handles offset-based pagination with consistency safeguards"""
def __init__(self, client: APIClient, endpoint: str, page_size: int = 100):
self.client = client
self.endpoint = endpoint
self.page_size = page_size
async def paginate(self, **query_params) -> AsyncIterator[APIResponse]:
"""
Iterate through offset-based pages with duplicate detection.
The major issue with offset pagination is that data can shift between
requests, causing duplicates or missed records. We implement basic
duplicate detection here.
"""
offset = 0
seen_ids = set()
consecutive_empty_pages = 0
while consecutive_empty_pages < 3: # Safety valve
params = {
'limit': self.page_size,
'offset': offset,
**query_params
}
response = await self.client.get(self.endpoint, params=params)
if not response.data.get('results'):
consecutive_empty_pages += 1
offset += self.page_size
continue
consecutive_empty_pages = 0
# Filter duplicates that can occur when data shifts during pagination
filtered_results = []
for item in response.data['results']:
item_id = item.get('id')
if item_id and item_id not in seen_ids:
seen_ids.add(item_id)
filtered_results.append(item)
if filtered_results:
# Create new response with filtered data
filtered_response = APIResponse(
data={'results': filtered_results},
status_code=response.status_code,
headers=response.headers,
rate_limit_remaining=response.rate_limit_remaining,
rate_limit_reset=response.rate_limit_reset,
has_more=len(response.data['results']) == self.page_size
)
yield filtered_response
offset += self.page_size
# Break if we got less than a full page
if len(response.data['results']) < self.page_size:
break
Warning: Offset pagination becomes unreliable with large datasets and high write rates. Use it only for small, relatively static datasets or when you can tolerate some data inconsistency.
The most reliable pattern for large-scale data ingestion:
class CursorPaginator:
"""Handles cursor-based pagination with various cursor formats"""
def __init__(self, client: APIClient, endpoint: str, page_size: int = 100):
self.client = client
self.endpoint = endpoint
self.page_size = page_size
async def paginate(self, initial_cursor: Optional[str] = None,
**query_params) -> AsyncIterator[APIResponse]:
"""
Iterate through cursor-based pages.
Cursor pagination is preferred for large datasets because it provides
consistent results even when data is being added/modified during iteration.
"""
cursor = initial_cursor
while True:
params = {
'limit': self.page_size,
**query_params
}
if cursor:
# Different APIs use different parameter names
if 'cursor' in query_params:
params['cursor'] = cursor
elif 'page_token' in query_params:
params['page_token'] = cursor
else:
params['after'] = cursor # Default assumption
response = await self.client.get(self.endpoint, params=params)
if not response.data.get('data', response.data.get('results', [])):
break
# Extract next cursor from various response formats
cursor = self._extract_next_cursor(response)
# Set pagination metadata
response.next_page_token = cursor
response.has_more = cursor is not None
yield response
if not cursor:
break
def _extract_next_cursor(self, response: APIResponse) -> Optional[str]:
"""Extract next cursor from different API response formats"""
data = response.data
# Slack/Discord style
if 'response_metadata' in data:
return data['response_metadata'].get('next_cursor')
# GitHub style
if 'pagination' in data:
return data['pagination'].get('next')
# Stripe style
if data.get('has_more') and 'data' in data and data['data']:
return data['data'][-1]['id']
# Generic pagination object
if 'paging' in data and data['paging'].get('next'):
# Extract cursor from URL
next_url = data['paging']['next']
return self._extract_cursor_from_url(next_url)
return None
def _extract_cursor_from_url(self, url: str) -> Optional[str]:
"""Extract cursor parameter from pagination URL"""
from urllib.parse import urlparse, parse_qs
parsed = urlparse(url)
params = parse_qs(parsed.query)
for param_name in ['cursor', 'after', 'page_token']:
if param_name in params:
return params[param_name][0]
return None
Critical for incremental data loads:
class TimePaginator:
"""Handles time-based pagination for incremental data loading"""
def __init__(self, client: APIClient, endpoint: str,
time_field: str = 'created_at', page_size: int = 100):
self.client = client
self.endpoint = endpoint
self.time_field = time_field
self.page_size = page_size
async def paginate_since(self, since: datetime, until: Optional[datetime] = None,
**query_params) -> AsyncIterator[APIResponse]:
"""
Paginate through records created/modified since a specific time.
This is essential for incremental loads where you only want new/changed data.
"""
current_since = since
until = until or datetime.now()
while current_since < until:
params = {
'limit': self.page_size,
f'{self.time_field}_gte': current_since.isoformat(),
f'{self.time_field}_lt': until.isoformat(),
**query_params
}
response = await self.client.get(self.endpoint, params=params)
items = response.data.get('data', response.data.get('results', []))
if not items:
break
yield response
# Move time window forward based on the last item's timestamp
last_item_time = self._parse_timestamp(items[-1][self.time_field])
# Add small buffer to handle items with identical timestamps
current_since = last_item_time + timedelta(milliseconds=1)
# If we got less than a full page, we're done
if len(items) < self.page_size:
break
def _parse_timestamp(self, timestamp_str: str) -> datetime:
"""Parse timestamp string to datetime object"""
from dateutil.parser import parse
return parse(timestamp_str)
Rate limiting is where data pipelines succeed or fail at scale. Simple sleep-based approaches don't work for production systems that need to maximize throughput while respecting limits.
import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class RateLimitConfig:
requests_per_second: float
burst_capacity: int
time_window: int = 1
class TokenBucketRateLimiter:
"""
Advanced token bucket rate limiter with burst handling.
This allows for efficient burst usage while maintaining average rate limits.
Much more sophisticated than simple delay-based limiting.
"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.tokens = config.burst_capacity
self.last_update = time.time()
self.lock = asyncio.Lock()
async def acquire(self, tokens: int = 1) -> float:
"""
Acquire tokens from the bucket, returning wait time if needed.
Returns the time to wait before the request should be made.
"""
async with self.lock:
now = time.time()
# Add tokens based on elapsed time
elapsed = now - self.last_update
tokens_to_add = elapsed * self.config.requests_per_second
self.tokens = min(
self.config.burst_capacity,
self.tokens + tokens_to_add
)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return 0 # No wait needed
else:
# Calculate wait time for required tokens
tokens_needed = tokens - self.tokens
wait_time = tokens_needed / self.config.requests_per_second
return wait_time
class AdaptiveRateLimiter:
"""
Rate limiter that adapts based on API responses and maintains separate
limits for different resource types.
"""
def __init__(self):
self.limiters: Dict[str, TokenBucketRateLimiter] = {}
self.response_times = deque(maxlen=100)
self.error_count = 0
self.success_count = 0
def get_limiter(self, resource_type: str, config: RateLimitConfig) -> TokenBucketRateLimiter:
"""Get or create rate limiter for specific resource type"""
if resource_type not in self.limiters:
self.limiters[resource_type] = TokenBucketRateLimiter(config)
return self.limiters[resource_type]
async def execute_request(self, client: APIClient, method: str, url: str,
resource_type: str = 'default', **kwargs) -> APIResponse:
"""Execute request with adaptive rate limiting"""
# Default configuration - adjust based on your APIs
default_config = RateLimitConfig(
requests_per_second=10.0,
burst_capacity=50
)
limiter = self.get_limiter(resource_type, default_config)
# Apply rate limiting
wait_time = await limiter.acquire()
if wait_time > 0:
await asyncio.sleep(wait_time)
start_time = time.time()
try:
response = await getattr(client, method.lower())(url, **kwargs)
# Track performance metrics
response_time = time.time() - start_time
self.response_times.append(response_time)
self.success_count += 1
# Adapt rate limits based on API response headers
if response.rate_limit_remaining is not None:
await self._adapt_to_api_limits(response, resource_type)
return response
except Exception as e:
self.error_count += 1
# Implement exponential backoff for errors
if '429' in str(e) or 'rate limit' in str(e).lower():
await self._handle_rate_limit_error(e, resource_type)
raise
async def _adapt_to_api_limits(self, response: APIResponse, resource_type: str):
"""Dynamically adjust rate limits based on API response headers"""
if response.rate_limit_remaining is not None and response.rate_limit_reset:
remaining = response.rate_limit_remaining
reset_time = response.rate_limit_reset
time_until_reset = (reset_time - datetime.now()).total_seconds()
if time_until_reset > 0 and remaining > 0:
# Calculate safe rate to avoid hitting limits
safe_rate = (remaining * 0.9) / time_until_reset # 90% of available rate
# Update the limiter configuration
limiter = self.limiters[resource_type]
new_config = RateLimitConfig(
requests_per_second=max(0.1, safe_rate),
burst_capacity=max(1, int(remaining * 0.1))
)
self.limiters[resource_type] = TokenBucketRateLimiter(new_config)
async def _handle_rate_limit_error(self, error, resource_type: str):
"""Handle rate limit errors with exponential backoff"""
base_wait = 1.0
jitter = 0.1 * (2 ** min(self.error_count, 10)) # Exponential backoff
wait_time = base_wait + jitter
logging.warning(f"Rate limit hit for {resource_type}, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
For handling sustained API failures:
from enum import Enum
import time
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
"""
Circuit breaker pattern for API calls.
Prevents cascading failures by failing fast when an API is consistently
returning errors. Essential for resilient data pipelines.
"""
def __init__(self, failure_threshold: int = 5, timeout: int = 60,
expected_exception: type = Exception):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.state = CircuitState.CLOSED
self.last_failure_time = None
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function call through circuit breaker"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
return (time.time() - self.last_failure_time) >= self.timeout
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
# Enhanced API client with circuit breaker
class ResilientAPIClient(APIClient):
"""API client with built-in resilience patterns"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.rate_limiter = AdaptiveRateLimiter()
self.circuit_breaker = CircuitBreaker()
async def get(self, endpoint: str, params: Optional[Dict] = None) -> APIResponse:
"""GET request with rate limiting and circuit breaker"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
return await self.circuit_breaker.call(
self.rate_limiter.execute_request,
self, 'get', url, params=params
)
Long-running data pipelines need sophisticated authentication handling:
import jwt
from typing import Optional, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class AuthToken:
access_token: str
refresh_token: Optional[str]
expires_at: datetime
token_type: str = "Bearer"
@property
def is_expired(self) -> bool:
return datetime.now() >= self.expires_at - timedelta(minutes=5) # 5-minute buffer
@property
def auth_header(self) -> Dict[str, str]:
return {"Authorization": f"{self.token_type} {self.access_token}"}
class TokenManager:
"""
Manages OAuth tokens with automatic refresh for long-running processes.
Critical for data pipelines that run for hours or days.
"""
def __init__(self, client_id: str, client_secret: str,
token_endpoint: str, refresh_endpoint: str = None):
self.client_id = client_id
self.client_secret = client_secret
self.token_endpoint = token_endpoint
self.refresh_endpoint = refresh_endpoint or token_endpoint
self.current_token: Optional[AuthToken] = None
self._refresh_lock = asyncio.Lock()
async def get_valid_token(self) -> AuthToken:
"""Get a valid token, refreshing if necessary"""
if not self.current_token or self.current_token.is_expired:
await self._refresh_token()
return self.current_token
async def _refresh_token(self):
"""Refresh the current token with thread safety"""
async with self._refresh_lock:
# Double-check pattern to avoid multiple simultaneous refreshes
if self.current_token and not self.current_token.is_expired:
return
if self.current_token and self.current_token.refresh_token:
await self._refresh_with_refresh_token()
else:
await self._get_new_token()
async def _refresh_with_refresh_token(self):
"""Use refresh token to get new access token"""
data = {
'grant_type': 'refresh_token',
'refresh_token': self.current_token.refresh_token,
'client_id': self.client_id,
'client_secret': self.client_secret
}
async with httpx.AsyncClient() as client:
response = await client.post(self.refresh_endpoint, data=data)
response.raise_for_status()
token_data = response.json()
self.current_token = self._create_token_from_response(token_data)
async def _get_new_token(self):
"""Get a new token using client credentials"""
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
async with httpx.AsyncClient() as client:
response = await client.post(self.token_endpoint, data=data)
response.raise_for_status()
token_data = response.json()
self.current_token = self._create_token_from_response(token_data)
def _create_token_from_response(self, token_data: Dict[str, Any]) -> AuthToken:
"""Create AuthToken from API response"""
expires_in = token_data.get('expires_in', 3600)
expires_at = datetime.now() + timedelta(seconds=expires_in)
return AuthToken(
access_token=token_data['access_token'],
refresh_token=token_data.get('refresh_token'),
expires_at=expires_at,
token_type=token_data.get('token_type', 'Bearer')
)
class AuthenticatedAPIClient(ResilientAPIClient):
"""API client with automatic token management"""
def __init__(self, base_url: str, token_manager: TokenManager, **kwargs):
super().__init__(base_url, {}, **kwargs) # Empty auth_header initially
self.token_manager = token_manager
async def _get_auth_headers(self) -> Dict[str, str]:
"""Get current authentication headers"""
token = await self.token_manager.get_valid_token()
return token.auth_header
async def get(self, endpoint: str, params: Optional[Dict] = None) -> APIResponse:
"""GET request with automatic token refresh"""
headers = await self._get_auth_headers()
# Update session headers
if self.session:
self.session.headers.update(headers)
return await super().get(endpoint, params)
Ensuring data consistency across pipeline runs is crucial:
import hashlib
import json
from typing import Set, List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DataCheckpoint:
"""Represents a checkpoint in data ingestion process"""
last_synced_id: Optional[str]
last_synced_timestamp: Optional[datetime]
page_token: Optional[str]
record_count: int
data_hash: str
created_at: datetime
class ConsistencyManager:
"""
Manages data consistency for incremental loads and deduplication.
Essential for production data pipelines that need to handle failures,
restarts, and ensure exactly-once processing semantics.
"""
def __init__(self, checkpoint_store):
self.checkpoint_store = checkpoint_store # Could be Redis, DynamoDB, etc.
self.processed_ids: Set[str] = set()
async def get_last_checkpoint(self, pipeline_id: str) -> Optional[DataCheckpoint]:
"""Get the last successful checkpoint for a pipeline"""
return await self.checkpoint_store.get(f"checkpoint:{pipeline_id}")
async def save_checkpoint(self, pipeline_id: str, checkpoint: DataCheckpoint):
"""Save checkpoint data"""
await self.checkpoint_store.set(
f"checkpoint:{pipeline_id}",
checkpoint
)
def calculate_data_hash(self, records: List[Dict[str, Any]]) -> str:
"""Calculate hash of record set for consistency verification"""
# Sort records by ID to ensure consistent hashing
sorted_records = sorted(records, key=lambda x: str(x.get('id', '')))
# Create hash from sorted JSON representation
data_str = json.dumps(sorted_records, sort_keys=True, default=str)
return hashlib.sha256(data_str.encode()).hexdigest()
async def deduplicate_records(self, records: List[Dict[str, Any]],
id_field: str = 'id') -> List[Dict[str, Any]]:
"""Remove duplicates from record set"""
seen_ids = set()
deduplicated = []
for record in records:
record_id = record.get(id_field)
if record_id and record_id not in seen_ids and record_id not in self.processed_ids:
seen_ids.add(record_id)
self.processed_ids.add(record_id)
deduplicated.append(record)
return deduplicated
async def verify_data_consistency(self, current_records: List[Dict[str, Any]],
previous_hash: str) -> bool:
"""Verify data hasn't been corrupted or modified unexpectedly"""
current_hash = self.calculate_data_hash(current_records)
return current_hash != previous_hash # Different hash means new/changed data
class IncrementalLoader:
"""
Handles incremental data loading with consistency guarantees.
This is the pattern you'll use most often in production data pipelines.
"""
def __init__(self, client: AuthenticatedAPIClient,
consistency_manager: ConsistencyManager,
pipeline_id: str):
self.client = client
self.consistency_manager = consistency_manager
self.pipeline_id = pipeline_id
async def load_incremental_data(self, endpoint: str,
time_field: str = 'updated_at') -> AsyncIterator[List[Dict[str, Any]]]:
"""
Load data incrementally with consistency guarantees.
This handles the full complexity of incremental loading:
- Resuming from checkpoints
- Deduplication
- Consistency verification
- Error recovery
"""
# Get last checkpoint
checkpoint = await self.consistency_manager.get_last_checkpoint(self.pipeline_id)
# Determine starting point
if checkpoint:
start_time = checkpoint.last_synced_timestamp
initial_cursor = checkpoint.page_token
logging.info(f"Resuming from checkpoint: {start_time}")
else:
start_time = datetime.now() - timedelta(days=1) # Default lookback
initial_cursor = None
logging.info(f"Starting fresh load from: {start_time}")
# Set up paginator based on API type
if initial_cursor:
paginator = CursorPaginator(self.client, endpoint)
page_iterator = paginator.paginate(initial_cursor=initial_cursor)
else:
paginator = TimePaginator(self.client, endpoint, time_field)
page_iterator = paginator.paginate_since(start_time)
batch = []
total_records = 0
last_record_time = start_time
async for response in page_iterator:
records = response.data.get('data', response.data.get('results', []))
if not records:
continue
# Deduplicate records
deduplicated_records = await self.consistency_manager.deduplicate_records(records)
if not deduplicated_records:
continue # All records were duplicates
batch.extend(deduplicated_records)
total_records += len(deduplicated_records)
# Update tracking variables
if deduplicated_records and time_field in deduplicated_records[-1]:
last_record_time = self.consistency_manager._parse_timestamp(
deduplicated_records[-1][time_field]
)
# Yield batch when it reaches optimal size
if len(batch) >= 1000: # Configurable batch size
yield batch
# Save intermediate checkpoint
await self._save_checkpoint(
batch[-1], response.next_page_token, total_records, last_record_time
)
batch = []
# Yield final batch
if batch:
yield batch
await self._save_checkpoint(
batch[-1], None, total_records, last_record_time
)
async def _save_checkpoint(self, last_record: Dict[str, Any],
page_token: Optional[str], record_count: int,
last_timestamp: datetime):
"""Save checkpoint with current state"""
checkpoint = DataCheckpoint(
last_synced_id=last_record.get('id'),
last_synced_timestamp=last_timestamp,
page_token=page_token,
record_count=record_count,
data_hash=self.consistency_manager.calculate_data_hash([last_record]),
created_at=datetime.now()
)
await self.consistency_manager.save_checkpoint(self.pipeline_id, checkpoint)
For high-throughput data ingestion:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable, Any
import aiofiles
import orjson # Faster JSON library
class HighPerformanceAPIClient:
"""
Optimized client for high-throughput data ingestion.
Uses advanced techniques like connection pooling, concurrent requests,
and efficient serialization.
"""
def __init__(self, base_url: str, token_manager: TokenManager,
max_concurrent_requests: int = 50,
connection_pool_size: int = 100):
self.base_url = base_url
self.token_manager = token_manager
self.max_concurrent_requests = max_concurrent_requests
# Configure HTTP client for high performance
limits = httpx.Limits(
max_keepalive_connections=connection_pool_size,
max_connections=connection_pool_size,
keepalive_expiry=30.0
)
self.client = httpx.AsyncClient(
limits=limits,
timeout=httpx.Timeout(30.0, connect=10.0),
http2=True # Enable HTTP/2 for better performance
)
self.semaphore = asyncio.Semaphore(max_concurrent_requests)
async def fetch_multiple_endpoints(self, endpoints: List[str],
params_list: List[Dict] = None) -> List[APIResponse]:
"""Fetch multiple endpoints concurrently"""
if params_list is None:
params_list = [{}] * len(endpoints)
tasks = [
self._fetch_with_semaphore(endpoint, params)
for endpoint, params in zip(endpoints, params_list)
]
return await asyncio.gather(*tasks, return_exceptions=True)
async def _fetch_with_semaphore(self, endpoint: str, params: Dict) -> APIResponse:
"""Fetch single endpoint with concurrency limiting"""
async with self.semaphore:
headers = await self._get_auth_headers()
url = f"{self.base_url}/{endpoint.lstrip('/')}"
response = await self.client.get(url, params=params, headers=headers)
response.raise_for_status()
return APIResponse(
data=response.json(),
status_code=response.status_code,
headers=dict(response.headers)
)
async def stream_to_file(self, endpoint: str, file_path: str,
batch_processor: Callable = None):
"""Stream large datasets directly to file with optional processing"""
paginator = CursorPaginator(self, endpoint, page_size=1000)
async with aiofiles.open(file_path, 'wb') as f:
async for response in paginator.paginate():
records = response.data.get('data', [])
# Apply batch processing if provided
if batch_processor:
records = await batch_processor(records)
# Use fast JSON serialization
json_data = orjson.dumps(records)
await f.write(json_data + b'\n')
async def parallel_incremental_load(self, endpoints: List[str],
pipeline_ids: List[str],
consistency_managers: List[ConsistencyManager]) -> Dict[str, List[Dict]]:
"""Load from multiple endpoints in parallel"""
loaders = [
IncrementalLoader(self, cm, pid)
for cm, pid in zip(consistency_managers, pipeline_ids)
]
# Create tasks for each endpoint
tasks = [
self._load_endpoint_data(loader, endpoint)
for loader, endpoint in zip(loaders, endpoints)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
pipeline_id: result if not isinstance(result, Exception) else []
for pipeline_id, result in zip(pipeline_ids, results)
}
async def _load_endpoint_data(self, loader: IncrementalLoader,
endpoint: str) -> List[Dict[str, Any]]:
"""Load all data from a single endpoint"""
all_records = []
async for batch in loader.load_incremental_data(endpoint):
all_records.extend(batch)
return all_records
# Performance monitoring and metrics
class PerformanceMonitor:
"""Monitor API client performance and provide optimization insights"""
def __init__(self):
self.request_times = deque(maxlen=1000)
self.error_counts = {}
self.throughput_samples = deque(maxlen=100)
def record_request(self, duration: float, endpoint: str, status_code: int):
"""Record performance metrics for a request"""
self.request_times.append(duration)
if status_code >= 400:
self.error_counts[endpoint] = self.error_counts.get(endpoint, 0) + 1
def get_performance_stats(self) -> Dict[str, Any]:
"""Get current performance statistics"""
if not self.request_times:
return {}
times = list(self.request_times)
return {
'avg_response_time': sum(times) / len(times),
'p95_response_time': sorted(times)[int(len(times) * 0.95)],
'error_rate': sum(self.error_counts.values()) / len(times),
'total_requests': len(times)
}
Let's build a complete data ingestion pipeline that demonstrates all these concepts:
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
class MockCheckpointStore:
"""Simple in-memory checkpoint store for the exercise"""
def __init__(self):
self.checkpoints = {}
async def get(self, key: str) -> Optional[DataCheckpoint]:
return self.checkpoints.get(key)
async def set(self, key: str, checkpoint: DataCheckpoint):
self.checkpoints[key] = checkpoint
class MockTokenManager:
"""Mock token manager for the exercise"""
async def get_valid_token(self) -> AuthToken:
return AuthToken(
access_token="mock_token",
refresh_token=None,
expires_at=datetime.now() + timedelta(hours=1)
)
async def run_data_pipeline():
"""
Complete example: Multi-source data ingestion pipeline
This pipeline ingests data from three different mock APIs with
different pagination patterns and rate limits.
"""
# Set up components
checkpoint_store = MockCheckpointStore()
token_manager = MockTokenManager()
# API configurations for different sources
api_configs = {
'users': {
'base_url': 'https://api.example.com',
'endpoint': '/users',
'rate_limit': RateLimitConfig(requests_per_second=10, burst_capacity=50),
'pagination_type': 'cursor'
},
'transactions': {
'base_url': 'https://api.payments.com',
'endpoint': '/transactions',
'rate_limit': RateLimitConfig(requests_per_second=5, burst_capacity=20),
'pagination_type': 'time'
},
'events': {
'base_url': 'https://api.analytics.com',
'endpoint': '/events',
'rate_limit': RateLimitConfig(requests_per_second=20, burst_capacity=100),
'pagination_type': 'offset'
}
}
results = {}
# Process each API concurrently
tasks = []
for source_name, config in api_configs.items():
task = process_api_source(
source_name, config, checkpoint_store, token_manager
)
tasks.append(task)
# Execute all tasks concurrently
pipeline_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for source_name, result in zip(api_configs.keys(), pipeline_results):
if isinstance(result, Exception):
logging.error(f"Failed to process {source_name}: {result}")
results[source_name] = {"error": str(result), "records": 0}
else:
results[source_name] = result
logging.info(f"Successfully processed {source_name}: {result['records']} records")
return results
async def process_api_source(source_name: str, config: Dict[str, Any],
checkpoint_store, token_manager) -> Dict[str, Any]:
"""Process a single API source with full error handling and monitoring"""
try:
# Create specialized client for this source
client = HighPerformanceAPIClient(
config['base_url'],
token_manager,
max_concurrent_requests=10 # Conservative for demo
)
# Create consistency manager
consistency_manager = ConsistencyManager(checkpoint_store)
# Create incremental loader
loader = IncrementalLoader(
client,
consistency_manager,
f"pipeline_{source_name}"
)
# Load data incrementally
total_records = 0
start_time = time.time()
async for batch in loader.load_incremental_data(config['endpoint']):
# In a real pipeline, you'd send this to your data warehouse
# For demo, we just count records
total_records += len(batch)
# Log progress every 1000 records
if total_records % 1000 == 0:
logging.info(f"{source_name}: Processed {total_records} records")
elapsed_time = time.time() - start_time
return {
"source": source_name,
"records": total_records,
"duration_seconds": elapsed_time,
"records_per_second": total_records / elapsed_time if elapsed_time > 0 else 0
}
except Exception as e:
logging.error(f"Error processing {source_name}: {e}")
raise
# Run the complete pipeline
if __name__ == "__main__":
results = asyncio.run(run_data_pipeline())
print("\n=== Pipeline Results ===")
for source, stats in results.items():
if "error" in stats:
print(f"{source}: FAILED - {stats['error']}")
else:
print(f"{source}: {stats['records']} records in {stats['duration_seconds']:.2f}s "
f"({stats['records_per_second']:.1f} records/sec)")
This exercise demonstrates:
Problem: Records appearing in multiple pages or being skipped entirely.
# BAD: Naive offset pagination without duplicate handling
async def bad_offset_pagination():
offset = 0
while True:
response = await client.get(f"/items?offset={offset}&limit=100")
if not response.data['items']:
break
yield response.data['items']
offset += 100 # Data can shift, causing duplicates/misses
# GOOD: Cursor-based with proper duplicate detection
async def robust_pagination():
cursor = None
seen_ids = set()
while True:
params = {'limit': 100}
if cursor:
params['cursor'] = cursor
response = await client.get("/items", params=params)
items = response.data.get('items', [])
if not items:
break
# Filter duplicates
new_items = [item for item in items if item['id'] not in seen_ids]
seen_ids.update(item['id'] for item in new_items)
if new_items:
yield new_items
cursor = response.data.get('next_cursor')
if not cursor:
break
Problem: Getting 429 errors despite implementing rate limiting.
# Common issue: Not accounting for different rate limit types
class ProperRateLimiting:
def __init__(self):
# Many APIs have different limits for different operations
self.limiters = {
'read': TokenBucketRateLimiter(RateLimitConfig(10, 50)),
'write': TokenBucketRateLimiter(RateLimitConfig(2, 10)),
'search': TokenBucketRateLimiter(RateLimitConfig(1, 5))
}
async def make_request(self, endpoint: str, method: str = 'GET'):
# Determine operation type
operation_type = 'write' if method in ['POST', 'PUT', 'PATCH'] else 'read'
if 'search' in endpoint:
operation_type = 'search'
# Apply appropriate rate limit
limiter = self.limiters[operation_type]
wait_time = await limiter.acquire()
if wait_time > 0:
await asyncio.sleep(wait_time)
return await self.client.request(method, endpoint)
Problem: Tokens expiring mid-pipeline causing cascading failures.
# BAD: Not handling token refresh properly
class BadTokenHandling:
async def make_request(self):
# This will fail when token expires
headers = {"Authorization": f"Bearer {self.token}"}
return await self.client.get("/data", headers=headers)
# GOOD: Proactive token management
class GoodTokenHandling:
async def make_request(self):
# Always get fresh token (manager handles caching/refresh)
token = await self.token_manager.get_valid_token()
headers = token.auth_header
try:
return await self.client.get("/data", headers=headers)
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
# Force token refresh and retry once
await self.token_manager._refresh_token()
token = await self.token_manager.get_valid_token()
headers = token.auth_header
return await self.client.get("/data", headers=headers)
raise
Problem: Loading entire datasets into memory causing OOM errors.
# BAD: Loading everything into memory
async def memory_intensive():
all_data = []
async for page in paginator.paginate():
all_data.extend(page.data['items']) # Memory grows indefinitely
return all_data
# GOOD: Streaming processing
async def memory_efficient():
async for page in paginator.paginate():
# Process each batch immediately
batch = page.data['items']
await process_batch(batch) # Send to warehouse, write to file, etc.
# Optional: yield for caller to handle
yield batch
When things go wrong, use these debugging techniques:
import logging
from typing import Dict, Any
class DebuggingAPIClient:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.request_log = []
async def request(self, method: str, url: str, **kwargs) -> APIResponse:
# Log all request details
request_info = {
'timestamp': datetime.now().isoformat(),
'method': method,
'url': url,
'headers': kwargs.get('headers', {}),
'params': kwargs.get('params', {})
}
try:
response = await super().request(method, url, **kwargs)
request_info.update({
'status_code': response.status_code,
'response_headers': dict(response.headers),
'success': True
})
# Log rate limit information
if response.rate_limit_remaining is not None:
logging.info(f"Rate limit: {response.rate_limit_remaining} remaining, "
f"resets at {response.rate_limit_reset}")
return response
except Exception as e:
request_info.update({
'error': str(e),
'success': False
})
logging.error(f"Request failed: {request_info}")
raise
finally:
self.request_log.append(request_info)
def get_debug_info(self) -> Dict[str, Any]:
"""Get debugging information for troubleshooting"""
total_requests = len(self.request_log)
successful_requests = sum(1 for req in self.request_log if req['success'])
return {
'total_requests': total_requests,
'successful_requests': successful_requests,
'error_rate': (total_requests - successful_requests) / total_requests if total_requests > 0 else 0,
'recent_errors': [req for req in self.request_log[-10:] if not req['success']],
'rate_limit_hits': [req for req in self.request_log if req.get('status_code') == 429]
}
You've now mastered the sophisticated patterns required for production-scale API integration in data pipelines. The key takeaways:
Pagination Mastery: Different APIs require different pagination strategies. Cursor-based pagination provides the most consistency, time-based pagination enables efficient incremental loads, and offset pagination should be used sparingly with duplicate detection.
Rate Limiting Sophistication: Token bucket algorithms provide optimal throughput while respecting limits. Adaptive rate limiting based on API responses prevents hitting limits, and circuit breakers prevent cascading failures.
Resilience Patterns: Authentication token management, exponential backoff, and checkpoint-based recovery ensure your pipelines can handle the inevitable failures and restarts of production systems.
Performance Optimization: Connection pooling, concurrent requests, and streaming processing patterns enable high-throughput data ingestion without overwhelming your systems or the APIs you're consuming.
Data Consistency: Deduplication, checkpointing, and incremental loading patterns ensure exactly-once processing semantics even in the face of failures and retries.
Implement monitoring and alerting around your API clients. Track error rates, response times, and rate limit usage.
Add schema validation to detect API changes early. APIs evolve, and your pipelines need to handle schema drift gracefully.
Explore webhook integration for real-time data ingestion. Many APIs offer webhooks as an alternative to polling.
Implement data quality checks at ingestion time. Validate data completeness, freshness, and accuracy before loading into your warehouse.
Study specific API patterns for the services you integrate with. Each API has its own quirks and optimizations.
The patterns you've learned here form the foundation for any serious data engineering work involving APIs. These aren't theoretical concepts—they're battle-tested solutions to real problems you'll encounter when building production data pipelines at scale.
Learning Path: Data Pipeline Fundamentals