Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Advanced API Integration for Data Pipelines: Mastering REST, Pagination, and Rate Limiting

Advanced API Integration for Data Pipelines: Mastering REST, Pagination, and Rate Limiting

Data Engineering🔥 Expert23 min readApr 2, 2026Updated Apr 2, 2026
Table of Contents
  • Prerequisites
  • Understanding Modern API Architectures
  • Advanced Pagination Strategies
  • Offset-Based Pagination
  • Cursor-Based Pagination
  • Time-Based Pagination
  • Sophisticated Rate Limiting Strategies
  • Token Bucket Implementation
  • Circuit Breaker Pattern
  • Authentication and Token Management
  • Data Consistency Patterns
  • Performance Optimization Techniques
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting

You're building a data pipeline to ingest customer support tickets from Zendesk, user analytics from Mixpanel, and financial data from Stripe. Each API has different pagination schemes, rate limits, and authentication requirements. Your pipeline needs to handle thousands of requests per hour while respecting rate limits, gracefully handling failures, and ensuring data consistency across batch runs.

This scenario represents the reality of modern data engineering: APIs are everywhere, but they're all different. Understanding how to work with REST APIs at scale isn't just about making HTTP requests—it's about architecting resilient systems that can handle the complexities of real-world API integration.

What you'll learn:

  • Advanced pagination strategies for different API patterns and their performance implications
  • Rate limiting algorithms and implementation patterns for sustained high-throughput data ingestion
  • Circuit breaker patterns and exponential backoff strategies for handling API failures
  • Authentication token management and refresh strategies for long-running processes
  • Data consistency patterns for incremental loads and handling duplicate data
  • Performance optimization techniques including connection pooling and concurrent request management

Prerequisites

You should have solid experience with Python and HTTP concepts, familiarity with data pipeline architectures, and understanding of basic REST principles. We'll work with advanced patterns that assume you've built API clients before.

Understanding Modern API Architectures

Real-world APIs aren't just simple request-response patterns. They're complex systems with sophisticated rate limiting, pagination schemes, and failure modes that can break naive implementations.

Let's start by examining three common API patterns you'll encounter:

import httpx
import asyncio
import time
from dataclasses import dataclass
from typing import Optional, Dict, Any, AsyncIterator
from datetime import datetime, timedelta
import logging

@dataclass
class APIResponse:
    """Standardized response wrapper for all API interactions"""
    data: Dict[str, Any]
    status_code: int
    headers: Dict[str, str]
    rate_limit_remaining: Optional[int] = None
    rate_limit_reset: Optional[datetime] = None
    next_page_token: Optional[str] = None
    has_more: bool = False

class APIClient:
    """Base client with common patterns for REST API interaction"""
    
    def __init__(self, base_url: str, auth_header: Dict[str, str], 
                 max_retries: int = 3, timeout: int = 30):
        self.base_url = base_url.rstrip('/')
        self.auth_header = auth_header
        self.max_retries = max_retries
        self.timeout = timeout
        self.session = None
        self._rate_limit_state = {}
        
    async def __aenter__(self):
        self.session = httpx.AsyncClient(
            timeout=self.timeout,
            limits=httpx.Limits(max_keepalive_connections=20, max_connections=100)
        )
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.aclose()
    
    def _parse_rate_limit_headers(self, headers: Dict[str, str]) -> Dict[str, Any]:
        """Parse rate limit information from common header patterns"""
        rate_info = {}
        
        # GitHub/GitLab style
        if 'x-ratelimit-remaining' in headers:
            rate_info['remaining'] = int(headers['x-ratelimit-remaining'])
            rate_info['reset'] = datetime.fromtimestamp(
                int(headers.get('x-ratelimit-reset', 0))
            )
        
        # Twitter style  
        elif 'x-rate-limit-remaining' in headers:
            rate_info['remaining'] = int(headers['x-rate-limit-remaining'])
            rate_info['reset'] = datetime.fromtimestamp(
                int(headers.get('x-rate-limit-reset', 0))
            )
            
        # Stripe style (per-second limits)
        elif 'stripe-ratelimit-remaining' in headers:
            rate_info['remaining'] = int(headers['stripe-ratelimit-remaining'])
            rate_info['reset'] = datetime.now() + timedelta(seconds=1)
            
        return rate_info

This base client establishes patterns we'll use throughout: connection pooling for performance, standardized response handling, and rate limit header parsing. Notice how different APIs use different header formats—this is the reality of API integration.

Advanced Pagination Strategies

Pagination is where most API integrations break down under load. Let's examine the four main pagination patterns and their trade-offs:

Offset-Based Pagination

The simplest but most problematic pattern:

class OffsetPaginator:
    """Handles offset-based pagination with consistency safeguards"""
    
    def __init__(self, client: APIClient, endpoint: str, page_size: int = 100):
        self.client = client
        self.endpoint = endpoint
        self.page_size = page_size
        
    async def paginate(self, **query_params) -> AsyncIterator[APIResponse]:
        """
        Iterate through offset-based pages with duplicate detection.
        
        The major issue with offset pagination is that data can shift between
        requests, causing duplicates or missed records. We implement basic
        duplicate detection here.
        """
        offset = 0
        seen_ids = set()
        consecutive_empty_pages = 0
        
        while consecutive_empty_pages < 3:  # Safety valve
            params = {
                'limit': self.page_size,
                'offset': offset,
                **query_params
            }
            
            response = await self.client.get(self.endpoint, params=params)
            
            if not response.data.get('results'):
                consecutive_empty_pages += 1
                offset += self.page_size
                continue
                
            consecutive_empty_pages = 0
            
            # Filter duplicates that can occur when data shifts during pagination
            filtered_results = []
            for item in response.data['results']:
                item_id = item.get('id')
                if item_id and item_id not in seen_ids:
                    seen_ids.add(item_id)
                    filtered_results.append(item)
            
            if filtered_results:
                # Create new response with filtered data
                filtered_response = APIResponse(
                    data={'results': filtered_results},
                    status_code=response.status_code,
                    headers=response.headers,
                    rate_limit_remaining=response.rate_limit_remaining,
                    rate_limit_reset=response.rate_limit_reset,
                    has_more=len(response.data['results']) == self.page_size
                )
                yield filtered_response
            
            offset += self.page_size
            
            # Break if we got less than a full page
            if len(response.data['results']) < self.page_size:
                break

Warning: Offset pagination becomes unreliable with large datasets and high write rates. Use it only for small, relatively static datasets or when you can tolerate some data inconsistency.

Cursor-Based Pagination

The most reliable pattern for large-scale data ingestion:

class CursorPaginator:
    """Handles cursor-based pagination with various cursor formats"""
    
    def __init__(self, client: APIClient, endpoint: str, page_size: int = 100):
        self.client = client
        self.endpoint = endpoint
        self.page_size = page_size
        
    async def paginate(self, initial_cursor: Optional[str] = None, 
                      **query_params) -> AsyncIterator[APIResponse]:
        """
        Iterate through cursor-based pages.
        
        Cursor pagination is preferred for large datasets because it provides
        consistent results even when data is being added/modified during iteration.
        """
        cursor = initial_cursor
        
        while True:
            params = {
                'limit': self.page_size,
                **query_params
            }
            
            if cursor:
                # Different APIs use different parameter names
                if 'cursor' in query_params:
                    params['cursor'] = cursor
                elif 'page_token' in query_params:
                    params['page_token'] = cursor  
                else:
                    params['after'] = cursor  # Default assumption
            
            response = await self.client.get(self.endpoint, params=params)
            
            if not response.data.get('data', response.data.get('results', [])):
                break
                
            # Extract next cursor from various response formats
            cursor = self._extract_next_cursor(response)
            
            # Set pagination metadata
            response.next_page_token = cursor
            response.has_more = cursor is not None
            
            yield response
            
            if not cursor:
                break
                
    def _extract_next_cursor(self, response: APIResponse) -> Optional[str]:
        """Extract next cursor from different API response formats"""
        data = response.data
        
        # Slack/Discord style
        if 'response_metadata' in data:
            return data['response_metadata'].get('next_cursor')
            
        # GitHub style
        if 'pagination' in data:
            return data['pagination'].get('next')
            
        # Stripe style
        if data.get('has_more') and 'data' in data and data['data']:
            return data['data'][-1]['id']
            
        # Generic pagination object
        if 'paging' in data and data['paging'].get('next'):
            # Extract cursor from URL
            next_url = data['paging']['next']
            return self._extract_cursor_from_url(next_url)
            
        return None
        
    def _extract_cursor_from_url(self, url: str) -> Optional[str]:
        """Extract cursor parameter from pagination URL"""
        from urllib.parse import urlparse, parse_qs
        parsed = urlparse(url)
        params = parse_qs(parsed.query)
        
        for param_name in ['cursor', 'after', 'page_token']:
            if param_name in params:
                return params[param_name][0]
        return None

Time-Based Pagination

Critical for incremental data loads:

class TimePaginator:
    """Handles time-based pagination for incremental data loading"""
    
    def __init__(self, client: APIClient, endpoint: str, 
                 time_field: str = 'created_at', page_size: int = 100):
        self.client = client
        self.endpoint = endpoint
        self.time_field = time_field
        self.page_size = page_size
        
    async def paginate_since(self, since: datetime, until: Optional[datetime] = None,
                           **query_params) -> AsyncIterator[APIResponse]:
        """
        Paginate through records created/modified since a specific time.
        
        This is essential for incremental loads where you only want new/changed data.
        """
        current_since = since
        until = until or datetime.now()
        
        while current_since < until:
            params = {
                'limit': self.page_size,
                f'{self.time_field}_gte': current_since.isoformat(),
                f'{self.time_field}_lt': until.isoformat(),
                **query_params
            }
            
            response = await self.client.get(self.endpoint, params=params)
            items = response.data.get('data', response.data.get('results', []))
            
            if not items:
                break
                
            yield response
            
            # Move time window forward based on the last item's timestamp
            last_item_time = self._parse_timestamp(items[-1][self.time_field])
            
            # Add small buffer to handle items with identical timestamps
            current_since = last_item_time + timedelta(milliseconds=1)
            
            # If we got less than a full page, we're done
            if len(items) < self.page_size:
                break
    
    def _parse_timestamp(self, timestamp_str: str) -> datetime:
        """Parse timestamp string to datetime object"""
        from dateutil.parser import parse
        return parse(timestamp_str)

Sophisticated Rate Limiting Strategies

Rate limiting is where data pipelines succeed or fail at scale. Simple sleep-based approaches don't work for production systems that need to maximize throughput while respecting limits.

Token Bucket Implementation

import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Dict, Optional

@dataclass
class RateLimitConfig:
    requests_per_second: float
    burst_capacity: int
    time_window: int = 1

class TokenBucketRateLimiter:
    """
    Advanced token bucket rate limiter with burst handling.
    
    This allows for efficient burst usage while maintaining average rate limits.
    Much more sophisticated than simple delay-based limiting.
    """
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.tokens = config.burst_capacity
        self.last_update = time.time()
        self.lock = asyncio.Lock()
        
    async def acquire(self, tokens: int = 1) -> float:
        """
        Acquire tokens from the bucket, returning wait time if needed.
        
        Returns the time to wait before the request should be made.
        """
        async with self.lock:
            now = time.time()
            
            # Add tokens based on elapsed time
            elapsed = now - self.last_update
            tokens_to_add = elapsed * self.config.requests_per_second
            self.tokens = min(
                self.config.burst_capacity, 
                self.tokens + tokens_to_add
            )
            self.last_update = now
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return 0  # No wait needed
            else:
                # Calculate wait time for required tokens
                tokens_needed = tokens - self.tokens
                wait_time = tokens_needed / self.config.requests_per_second
                return wait_time

class AdaptiveRateLimiter:
    """
    Rate limiter that adapts based on API responses and maintains separate
    limits for different resource types.
    """
    
    def __init__(self):
        self.limiters: Dict[str, TokenBucketRateLimiter] = {}
        self.response_times = deque(maxlen=100)
        self.error_count = 0
        self.success_count = 0
        
    def get_limiter(self, resource_type: str, config: RateLimitConfig) -> TokenBucketRateLimiter:
        """Get or create rate limiter for specific resource type"""
        if resource_type not in self.limiters:
            self.limiters[resource_type] = TokenBucketRateLimiter(config)
        return self.limiters[resource_type]
        
    async def execute_request(self, client: APIClient, method: str, url: str, 
                            resource_type: str = 'default', **kwargs) -> APIResponse:
        """Execute request with adaptive rate limiting"""
        
        # Default configuration - adjust based on your APIs
        default_config = RateLimitConfig(
            requests_per_second=10.0,
            burst_capacity=50
        )
        
        limiter = self.get_limiter(resource_type, default_config)
        
        # Apply rate limiting
        wait_time = await limiter.acquire()
        if wait_time > 0:
            await asyncio.sleep(wait_time)
            
        start_time = time.time()
        
        try:
            response = await getattr(client, method.lower())(url, **kwargs)
            
            # Track performance metrics
            response_time = time.time() - start_time
            self.response_times.append(response_time)
            self.success_count += 1
            
            # Adapt rate limits based on API response headers
            if response.rate_limit_remaining is not None:
                await self._adapt_to_api_limits(response, resource_type)
                
            return response
            
        except Exception as e:
            self.error_count += 1
            
            # Implement exponential backoff for errors
            if '429' in str(e) or 'rate limit' in str(e).lower():
                await self._handle_rate_limit_error(e, resource_type)
                
            raise
    
    async def _adapt_to_api_limits(self, response: APIResponse, resource_type: str):
        """Dynamically adjust rate limits based on API response headers"""
        if response.rate_limit_remaining is not None and response.rate_limit_reset:
            remaining = response.rate_limit_remaining
            reset_time = response.rate_limit_reset
            time_until_reset = (reset_time - datetime.now()).total_seconds()
            
            if time_until_reset > 0 and remaining > 0:
                # Calculate safe rate to avoid hitting limits
                safe_rate = (remaining * 0.9) / time_until_reset  # 90% of available rate
                
                # Update the limiter configuration
                limiter = self.limiters[resource_type]
                new_config = RateLimitConfig(
                    requests_per_second=max(0.1, safe_rate),
                    burst_capacity=max(1, int(remaining * 0.1))
                )
                self.limiters[resource_type] = TokenBucketRateLimiter(new_config)
    
    async def _handle_rate_limit_error(self, error, resource_type: str):
        """Handle rate limit errors with exponential backoff"""
        base_wait = 1.0
        jitter = 0.1 * (2 ** min(self.error_count, 10))  # Exponential backoff
        wait_time = base_wait + jitter
        
        logging.warning(f"Rate limit hit for {resource_type}, waiting {wait_time:.2f}s")
        await asyncio.sleep(wait_time)

Circuit Breaker Pattern

For handling sustained API failures:

from enum import Enum
import time
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    """
    Circuit breaker pattern for API calls.
    
    Prevents cascading failures by failing fast when an API is consistently
    returning errors. Essential for resilient data pipelines.
    """
    
    def __init__(self, failure_threshold: int = 5, timeout: int = 60, 
                 expected_exception: type = Exception):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        self.last_failure_time = None
        
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function call through circuit breaker"""
        
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is OPEN")
                
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise
            
    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset"""
        return (time.time() - self.last_failure_time) >= self.timeout
        
    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
        
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

# Enhanced API client with circuit breaker
class ResilientAPIClient(APIClient):
    """API client with built-in resilience patterns"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.rate_limiter = AdaptiveRateLimiter()
        self.circuit_breaker = CircuitBreaker()
        
    async def get(self, endpoint: str, params: Optional[Dict] = None) -> APIResponse:
        """GET request with rate limiting and circuit breaker"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        return await self.circuit_breaker.call(
            self.rate_limiter.execute_request,
            self, 'get', url, params=params
        )

Authentication and Token Management

Long-running data pipelines need sophisticated authentication handling:

import jwt
from typing import Optional, Dict, Any
from dataclasses import dataclass
from datetime import datetime, timedelta

@dataclass
class AuthToken:
    access_token: str
    refresh_token: Optional[str]
    expires_at: datetime
    token_type: str = "Bearer"
    
    @property
    def is_expired(self) -> bool:
        return datetime.now() >= self.expires_at - timedelta(minutes=5)  # 5-minute buffer
    
    @property  
    def auth_header(self) -> Dict[str, str]:
        return {"Authorization": f"{self.token_type} {self.access_token}"}

class TokenManager:
    """
    Manages OAuth tokens with automatic refresh for long-running processes.
    
    Critical for data pipelines that run for hours or days.
    """
    
    def __init__(self, client_id: str, client_secret: str, 
                 token_endpoint: str, refresh_endpoint: str = None):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_endpoint = token_endpoint
        self.refresh_endpoint = refresh_endpoint or token_endpoint
        self.current_token: Optional[AuthToken] = None
        self._refresh_lock = asyncio.Lock()
        
    async def get_valid_token(self) -> AuthToken:
        """Get a valid token, refreshing if necessary"""
        if not self.current_token or self.current_token.is_expired:
            await self._refresh_token()
        return self.current_token
        
    async def _refresh_token(self):
        """Refresh the current token with thread safety"""
        async with self._refresh_lock:
            # Double-check pattern to avoid multiple simultaneous refreshes
            if self.current_token and not self.current_token.is_expired:
                return
                
            if self.current_token and self.current_token.refresh_token:
                await self._refresh_with_refresh_token()
            else:
                await self._get_new_token()
                
    async def _refresh_with_refresh_token(self):
        """Use refresh token to get new access token"""
        data = {
            'grant_type': 'refresh_token',
            'refresh_token': self.current_token.refresh_token,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(self.refresh_endpoint, data=data)
            response.raise_for_status()
            
            token_data = response.json()
            self.current_token = self._create_token_from_response(token_data)
            
    async def _get_new_token(self):
        """Get a new token using client credentials"""
        data = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(self.token_endpoint, data=data)
            response.raise_for_status()
            
            token_data = response.json()
            self.current_token = self._create_token_from_response(token_data)
            
    def _create_token_from_response(self, token_data: Dict[str, Any]) -> AuthToken:
        """Create AuthToken from API response"""
        expires_in = token_data.get('expires_in', 3600)
        expires_at = datetime.now() + timedelta(seconds=expires_in)
        
        return AuthToken(
            access_token=token_data['access_token'],
            refresh_token=token_data.get('refresh_token'),
            expires_at=expires_at,
            token_type=token_data.get('token_type', 'Bearer')
        )

class AuthenticatedAPIClient(ResilientAPIClient):
    """API client with automatic token management"""
    
    def __init__(self, base_url: str, token_manager: TokenManager, **kwargs):
        super().__init__(base_url, {}, **kwargs)  # Empty auth_header initially
        self.token_manager = token_manager
        
    async def _get_auth_headers(self) -> Dict[str, str]:
        """Get current authentication headers"""
        token = await self.token_manager.get_valid_token()
        return token.auth_header
        
    async def get(self, endpoint: str, params: Optional[Dict] = None) -> APIResponse:
        """GET request with automatic token refresh"""
        headers = await self._get_auth_headers()
        
        # Update session headers
        if self.session:
            self.session.headers.update(headers)
            
        return await super().get(endpoint, params)

Data Consistency Patterns

Ensuring data consistency across pipeline runs is crucial:

import hashlib
import json
from typing import Set, List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class DataCheckpoint:
    """Represents a checkpoint in data ingestion process"""
    last_synced_id: Optional[str]
    last_synced_timestamp: Optional[datetime]
    page_token: Optional[str]
    record_count: int
    data_hash: str
    created_at: datetime

class ConsistencyManager:
    """
    Manages data consistency for incremental loads and deduplication.
    
    Essential for production data pipelines that need to handle failures,
    restarts, and ensure exactly-once processing semantics.
    """
    
    def __init__(self, checkpoint_store):
        self.checkpoint_store = checkpoint_store  # Could be Redis, DynamoDB, etc.
        self.processed_ids: Set[str] = set()
        
    async def get_last_checkpoint(self, pipeline_id: str) -> Optional[DataCheckpoint]:
        """Get the last successful checkpoint for a pipeline"""
        return await self.checkpoint_store.get(f"checkpoint:{pipeline_id}")
        
    async def save_checkpoint(self, pipeline_id: str, checkpoint: DataCheckpoint):
        """Save checkpoint data"""
        await self.checkpoint_store.set(
            f"checkpoint:{pipeline_id}", 
            checkpoint
        )
        
    def calculate_data_hash(self, records: List[Dict[str, Any]]) -> str:
        """Calculate hash of record set for consistency verification"""
        # Sort records by ID to ensure consistent hashing
        sorted_records = sorted(records, key=lambda x: str(x.get('id', '')))
        
        # Create hash from sorted JSON representation
        data_str = json.dumps(sorted_records, sort_keys=True, default=str)
        return hashlib.sha256(data_str.encode()).hexdigest()
        
    async def deduplicate_records(self, records: List[Dict[str, Any]], 
                                id_field: str = 'id') -> List[Dict[str, Any]]:
        """Remove duplicates from record set"""
        seen_ids = set()
        deduplicated = []
        
        for record in records:
            record_id = record.get(id_field)
            if record_id and record_id not in seen_ids and record_id not in self.processed_ids:
                seen_ids.add(record_id)
                self.processed_ids.add(record_id)
                deduplicated.append(record)
                
        return deduplicated
        
    async def verify_data_consistency(self, current_records: List[Dict[str, Any]], 
                                    previous_hash: str) -> bool:
        """Verify data hasn't been corrupted or modified unexpectedly"""
        current_hash = self.calculate_data_hash(current_records)
        return current_hash != previous_hash  # Different hash means new/changed data

class IncrementalLoader:
    """
    Handles incremental data loading with consistency guarantees.
    
    This is the pattern you'll use most often in production data pipelines.
    """
    
    def __init__(self, client: AuthenticatedAPIClient, 
                 consistency_manager: ConsistencyManager,
                 pipeline_id: str):
        self.client = client
        self.consistency_manager = consistency_manager
        self.pipeline_id = pipeline_id
        
    async def load_incremental_data(self, endpoint: str, 
                                  time_field: str = 'updated_at') -> AsyncIterator[List[Dict[str, Any]]]:
        """
        Load data incrementally with consistency guarantees.
        
        This handles the full complexity of incremental loading:
        - Resuming from checkpoints
        - Deduplication
        - Consistency verification
        - Error recovery
        """
        
        # Get last checkpoint
        checkpoint = await self.consistency_manager.get_last_checkpoint(self.pipeline_id)
        
        # Determine starting point
        if checkpoint:
            start_time = checkpoint.last_synced_timestamp
            initial_cursor = checkpoint.page_token
            logging.info(f"Resuming from checkpoint: {start_time}")
        else:
            start_time = datetime.now() - timedelta(days=1)  # Default lookback
            initial_cursor = None
            logging.info(f"Starting fresh load from: {start_time}")
            
        # Set up paginator based on API type
        if initial_cursor:
            paginator = CursorPaginator(self.client, endpoint)
            page_iterator = paginator.paginate(initial_cursor=initial_cursor)
        else:
            paginator = TimePaginator(self.client, endpoint, time_field)
            page_iterator = paginator.paginate_since(start_time)
            
        batch = []
        total_records = 0
        last_record_time = start_time
        
        async for response in page_iterator:
            records = response.data.get('data', response.data.get('results', []))
            
            if not records:
                continue
                
            # Deduplicate records
            deduplicated_records = await self.consistency_manager.deduplicate_records(records)
            
            if not deduplicated_records:
                continue  # All records were duplicates
                
            batch.extend(deduplicated_records)
            total_records += len(deduplicated_records)
            
            # Update tracking variables
            if deduplicated_records and time_field in deduplicated_records[-1]:
                last_record_time = self.consistency_manager._parse_timestamp(
                    deduplicated_records[-1][time_field]
                )
                
            # Yield batch when it reaches optimal size
            if len(batch) >= 1000:  # Configurable batch size
                yield batch
                
                # Save intermediate checkpoint
                await self._save_checkpoint(
                    batch[-1], response.next_page_token, total_records, last_record_time
                )
                
                batch = []
                
        # Yield final batch
        if batch:
            yield batch
            await self._save_checkpoint(
                batch[-1], None, total_records, last_record_time
            )
            
    async def _save_checkpoint(self, last_record: Dict[str, Any], 
                             page_token: Optional[str], record_count: int,
                             last_timestamp: datetime):
        """Save checkpoint with current state"""
        checkpoint = DataCheckpoint(
            last_synced_id=last_record.get('id'),
            last_synced_timestamp=last_timestamp,
            page_token=page_token,
            record_count=record_count,
            data_hash=self.consistency_manager.calculate_data_hash([last_record]),
            created_at=datetime.now()
        )
        
        await self.consistency_manager.save_checkpoint(self.pipeline_id, checkpoint)

Performance Optimization Techniques

For high-throughput data ingestion:

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Callable, Any
import aiofiles
import orjson  # Faster JSON library

class HighPerformanceAPIClient:
    """
    Optimized client for high-throughput data ingestion.
    
    Uses advanced techniques like connection pooling, concurrent requests,
    and efficient serialization.
    """
    
    def __init__(self, base_url: str, token_manager: TokenManager,
                 max_concurrent_requests: int = 50,
                 connection_pool_size: int = 100):
        self.base_url = base_url
        self.token_manager = token_manager
        self.max_concurrent_requests = max_concurrent_requests
        
        # Configure HTTP client for high performance
        limits = httpx.Limits(
            max_keepalive_connections=connection_pool_size,
            max_connections=connection_pool_size,
            keepalive_expiry=30.0
        )
        
        self.client = httpx.AsyncClient(
            limits=limits,
            timeout=httpx.Timeout(30.0, connect=10.0),
            http2=True  # Enable HTTP/2 for better performance
        )
        
        self.semaphore = asyncio.Semaphore(max_concurrent_requests)
        
    async def fetch_multiple_endpoints(self, endpoints: List[str], 
                                     params_list: List[Dict] = None) -> List[APIResponse]:
        """Fetch multiple endpoints concurrently"""
        if params_list is None:
            params_list = [{}] * len(endpoints)
            
        tasks = [
            self._fetch_with_semaphore(endpoint, params)
            for endpoint, params in zip(endpoints, params_list)
        ]
        
        return await asyncio.gather(*tasks, return_exceptions=True)
        
    async def _fetch_with_semaphore(self, endpoint: str, params: Dict) -> APIResponse:
        """Fetch single endpoint with concurrency limiting"""
        async with self.semaphore:
            headers = await self._get_auth_headers()
            url = f"{self.base_url}/{endpoint.lstrip('/')}"
            
            response = await self.client.get(url, params=params, headers=headers)
            response.raise_for_status()
            
            return APIResponse(
                data=response.json(),
                status_code=response.status_code,
                headers=dict(response.headers)
            )
            
    async def stream_to_file(self, endpoint: str, file_path: str, 
                           batch_processor: Callable = None):
        """Stream large datasets directly to file with optional processing"""
        paginator = CursorPaginator(self, endpoint, page_size=1000)
        
        async with aiofiles.open(file_path, 'wb') as f:
            async for response in paginator.paginate():
                records = response.data.get('data', [])
                
                # Apply batch processing if provided
                if batch_processor:
                    records = await batch_processor(records)
                    
                # Use fast JSON serialization
                json_data = orjson.dumps(records)
                await f.write(json_data + b'\n')
                
    async def parallel_incremental_load(self, endpoints: List[str], 
                                      pipeline_ids: List[str],
                                      consistency_managers: List[ConsistencyManager]) -> Dict[str, List[Dict]]:
        """Load from multiple endpoints in parallel"""
        
        loaders = [
            IncrementalLoader(self, cm, pid)
            for cm, pid in zip(consistency_managers, pipeline_ids)
        ]
        
        # Create tasks for each endpoint
        tasks = [
            self._load_endpoint_data(loader, endpoint)
            for loader, endpoint in zip(loaders, endpoints)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return {
            pipeline_id: result if not isinstance(result, Exception) else []
            for pipeline_id, result in zip(pipeline_ids, results)
        }
        
    async def _load_endpoint_data(self, loader: IncrementalLoader, 
                                endpoint: str) -> List[Dict[str, Any]]:
        """Load all data from a single endpoint"""
        all_records = []
        async for batch in loader.load_incremental_data(endpoint):
            all_records.extend(batch)
        return all_records

# Performance monitoring and metrics
class PerformanceMonitor:
    """Monitor API client performance and provide optimization insights"""
    
    def __init__(self):
        self.request_times = deque(maxlen=1000)
        self.error_counts = {}
        self.throughput_samples = deque(maxlen=100)
        
    def record_request(self, duration: float, endpoint: str, status_code: int):
        """Record performance metrics for a request"""
        self.request_times.append(duration)
        
        if status_code >= 400:
            self.error_counts[endpoint] = self.error_counts.get(endpoint, 0) + 1
            
    def get_performance_stats(self) -> Dict[str, Any]:
        """Get current performance statistics"""
        if not self.request_times:
            return {}
            
        times = list(self.request_times)
        return {
            'avg_response_time': sum(times) / len(times),
            'p95_response_time': sorted(times)[int(len(times) * 0.95)],
            'error_rate': sum(self.error_counts.values()) / len(times),
            'total_requests': len(times)
        }

Hands-On Exercise

Let's build a complete data ingestion pipeline that demonstrates all these concepts:

import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Any

# Configure logging
logging.basicConfig(level=logging.INFO, 
                   format='%(asctime)s - %(levelname)s - %(message)s')

class MockCheckpointStore:
    """Simple in-memory checkpoint store for the exercise"""
    
    def __init__(self):
        self.checkpoints = {}
        
    async def get(self, key: str) -> Optional[DataCheckpoint]:
        return self.checkpoints.get(key)
        
    async def set(self, key: str, checkpoint: DataCheckpoint):
        self.checkpoints[key] = checkpoint

class MockTokenManager:
    """Mock token manager for the exercise"""
    
    async def get_valid_token(self) -> AuthToken:
        return AuthToken(
            access_token="mock_token",
            refresh_token=None,
            expires_at=datetime.now() + timedelta(hours=1)
        )

async def run_data_pipeline():
    """
    Complete example: Multi-source data ingestion pipeline
    
    This pipeline ingests data from three different mock APIs with
    different pagination patterns and rate limits.
    """
    
    # Set up components
    checkpoint_store = MockCheckpointStore()
    token_manager = MockTokenManager()
    
    # API configurations for different sources
    api_configs = {
        'users': {
            'base_url': 'https://api.example.com',
            'endpoint': '/users',
            'rate_limit': RateLimitConfig(requests_per_second=10, burst_capacity=50),
            'pagination_type': 'cursor'
        },
        'transactions': {
            'base_url': 'https://api.payments.com',
            'endpoint': '/transactions',
            'rate_limit': RateLimitConfig(requests_per_second=5, burst_capacity=20),
            'pagination_type': 'time'
        },
        'events': {
            'base_url': 'https://api.analytics.com', 
            'endpoint': '/events',
            'rate_limit': RateLimitConfig(requests_per_second=20, burst_capacity=100),
            'pagination_type': 'offset'
        }
    }
    
    results = {}
    
    # Process each API concurrently
    tasks = []
    for source_name, config in api_configs.items():
        task = process_api_source(
            source_name, config, checkpoint_store, token_manager
        )
        tasks.append(task)
        
    # Execute all tasks concurrently
    pipeline_results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Process results
    for source_name, result in zip(api_configs.keys(), pipeline_results):
        if isinstance(result, Exception):
            logging.error(f"Failed to process {source_name}: {result}")
            results[source_name] = {"error": str(result), "records": 0}
        else:
            results[source_name] = result
            logging.info(f"Successfully processed {source_name}: {result['records']} records")
            
    return results

async def process_api_source(source_name: str, config: Dict[str, Any],
                           checkpoint_store, token_manager) -> Dict[str, Any]:
    """Process a single API source with full error handling and monitoring"""
    
    try:
        # Create specialized client for this source
        client = HighPerformanceAPIClient(
            config['base_url'], 
            token_manager,
            max_concurrent_requests=10  # Conservative for demo
        )
        
        # Create consistency manager
        consistency_manager = ConsistencyManager(checkpoint_store)
        
        # Create incremental loader
        loader = IncrementalLoader(
            client, 
            consistency_manager, 
            f"pipeline_{source_name}"
        )
        
        # Load data incrementally
        total_records = 0
        start_time = time.time()
        
        async for batch in loader.load_incremental_data(config['endpoint']):
            # In a real pipeline, you'd send this to your data warehouse
            # For demo, we just count records
            total_records += len(batch)
            
            # Log progress every 1000 records
            if total_records % 1000 == 0:
                logging.info(f"{source_name}: Processed {total_records} records")
                
        elapsed_time = time.time() - start_time
        
        return {
            "source": source_name,
            "records": total_records,
            "duration_seconds": elapsed_time,
            "records_per_second": total_records / elapsed_time if elapsed_time > 0 else 0
        }
        
    except Exception as e:
        logging.error(f"Error processing {source_name}: {e}")
        raise

# Run the complete pipeline
if __name__ == "__main__":
    results = asyncio.run(run_data_pipeline())
    
    print("\n=== Pipeline Results ===")
    for source, stats in results.items():
        if "error" in stats:
            print(f"{source}: FAILED - {stats['error']}")
        else:
            print(f"{source}: {stats['records']} records in {stats['duration_seconds']:.2f}s "
                  f"({stats['records_per_second']:.1f} records/sec)")

This exercise demonstrates:

  • Concurrent processing of multiple API sources
  • Different pagination strategies based on API type
  • Rate limiting and error handling
  • Checkpoint-based recovery
  • Performance monitoring and reporting

Common Mistakes & Troubleshooting

Pagination Pitfalls

Problem: Records appearing in multiple pages or being skipped entirely.

# BAD: Naive offset pagination without duplicate handling
async def bad_offset_pagination():
    offset = 0
    while True:
        response = await client.get(f"/items?offset={offset}&limit=100")
        if not response.data['items']:
            break
        yield response.data['items']
        offset += 100  # Data can shift, causing duplicates/misses

# GOOD: Cursor-based with proper duplicate detection
async def robust_pagination():
    cursor = None
    seen_ids = set()
    
    while True:
        params = {'limit': 100}
        if cursor:
            params['cursor'] = cursor
            
        response = await client.get("/items", params=params)
        items = response.data.get('items', [])
        
        if not items:
            break
            
        # Filter duplicates
        new_items = [item for item in items if item['id'] not in seen_ids]
        seen_ids.update(item['id'] for item in new_items)
        
        if new_items:
            yield new_items
            
        cursor = response.data.get('next_cursor')
        if not cursor:
            break

Rate Limiting Errors

Problem: Getting 429 errors despite implementing rate limiting.

# Common issue: Not accounting for different rate limit types
class ProperRateLimiting:
    def __init__(self):
        # Many APIs have different limits for different operations
        self.limiters = {
            'read': TokenBucketRateLimiter(RateLimitConfig(10, 50)),
            'write': TokenBucketRateLimiter(RateLimitConfig(2, 10)),
            'search': TokenBucketRateLimiter(RateLimitConfig(1, 5))
        }
    
    async def make_request(self, endpoint: str, method: str = 'GET'):
        # Determine operation type
        operation_type = 'write' if method in ['POST', 'PUT', 'PATCH'] else 'read'
        if 'search' in endpoint:
            operation_type = 'search'
            
        # Apply appropriate rate limit
        limiter = self.limiters[operation_type]
        wait_time = await limiter.acquire()
        
        if wait_time > 0:
            await asyncio.sleep(wait_time)
            
        return await self.client.request(method, endpoint)

Authentication Token Expiry

Problem: Tokens expiring mid-pipeline causing cascading failures.

# BAD: Not handling token refresh properly
class BadTokenHandling:
    async def make_request(self):
        # This will fail when token expires
        headers = {"Authorization": f"Bearer {self.token}"}
        return await self.client.get("/data", headers=headers)

# GOOD: Proactive token management
class GoodTokenHandling:
    async def make_request(self):
        # Always get fresh token (manager handles caching/refresh)
        token = await self.token_manager.get_valid_token()
        headers = token.auth_header
        
        try:
            return await self.client.get("/data", headers=headers)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 401:
                # Force token refresh and retry once
                await self.token_manager._refresh_token()
                token = await self.token_manager.get_valid_token()
                headers = token.auth_header
                return await self.client.get("/data", headers=headers)
            raise

Memory Issues with Large Datasets

Problem: Loading entire datasets into memory causing OOM errors.

# BAD: Loading everything into memory
async def memory_intensive():
    all_data = []
    async for page in paginator.paginate():
        all_data.extend(page.data['items'])  # Memory grows indefinitely
    return all_data

# GOOD: Streaming processing
async def memory_efficient():
    async for page in paginator.paginate():
        # Process each batch immediately
        batch = page.data['items']
        await process_batch(batch)  # Send to warehouse, write to file, etc.
        
        # Optional: yield for caller to handle
        yield batch

Debugging API Integration Issues

When things go wrong, use these debugging techniques:

import logging
from typing import Dict, Any

class DebuggingAPIClient:
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.request_log = []
        
    async def request(self, method: str, url: str, **kwargs) -> APIResponse:
        # Log all request details
        request_info = {
            'timestamp': datetime.now().isoformat(),
            'method': method,
            'url': url,
            'headers': kwargs.get('headers', {}),
            'params': kwargs.get('params', {})
        }
        
        try:
            response = await super().request(method, url, **kwargs)
            
            request_info.update({
                'status_code': response.status_code,
                'response_headers': dict(response.headers),
                'success': True
            })
            
            # Log rate limit information
            if response.rate_limit_remaining is not None:
                logging.info(f"Rate limit: {response.rate_limit_remaining} remaining, "
                           f"resets at {response.rate_limit_reset}")
                           
            return response
            
        except Exception as e:
            request_info.update({
                'error': str(e),
                'success': False
            })
            logging.error(f"Request failed: {request_info}")
            raise
            
        finally:
            self.request_log.append(request_info)
            
    def get_debug_info(self) -> Dict[str, Any]:
        """Get debugging information for troubleshooting"""
        total_requests = len(self.request_log)
        successful_requests = sum(1 for req in self.request_log if req['success'])
        
        return {
            'total_requests': total_requests,
            'successful_requests': successful_requests,
            'error_rate': (total_requests - successful_requests) / total_requests if total_requests > 0 else 0,
            'recent_errors': [req for req in self.request_log[-10:] if not req['success']],
            'rate_limit_hits': [req for req in self.request_log if req.get('status_code') == 429]
        }

Summary & Next Steps

You've now mastered the sophisticated patterns required for production-scale API integration in data pipelines. The key takeaways:

Pagination Mastery: Different APIs require different pagination strategies. Cursor-based pagination provides the most consistency, time-based pagination enables efficient incremental loads, and offset pagination should be used sparingly with duplicate detection.

Rate Limiting Sophistication: Token bucket algorithms provide optimal throughput while respecting limits. Adaptive rate limiting based on API responses prevents hitting limits, and circuit breakers prevent cascading failures.

Resilience Patterns: Authentication token management, exponential backoff, and checkpoint-based recovery ensure your pipelines can handle the inevitable failures and restarts of production systems.

Performance Optimization: Connection pooling, concurrent requests, and streaming processing patterns enable high-throughput data ingestion without overwhelming your systems or the APIs you're consuming.

Data Consistency: Deduplication, checkpointing, and incremental loading patterns ensure exactly-once processing semantics even in the face of failures and retries.

Next Steps

  1. Implement monitoring and alerting around your API clients. Track error rates, response times, and rate limit usage.

  2. Add schema validation to detect API changes early. APIs evolve, and your pipelines need to handle schema drift gracefully.

  3. Explore webhook integration for real-time data ingestion. Many APIs offer webhooks as an alternative to polling.

  4. Implement data quality checks at ingestion time. Validate data completeness, freshness, and accuracy before loading into your warehouse.

  5. Study specific API patterns for the services you integrate with. Each API has its own quirks and optimizations.

The patterns you've learned here form the foundation for any serious data engineering work involving APIs. These aren't theoretical concepts—they're battle-tested solutions to real problems you'll encounter when building production data pipelines at scale.

Learning Path: Data Pipeline Fundamentals

Previous

Scheduling and Orchestrating Pipelines with Airflow

Related Articles

Data Engineering⚡ Practitioner

Scheduling and Orchestrating Pipelines with Airflow

18 min
Data Engineering🌱 Foundation

Data Quality: Validation, Testing, and Monitoring Pipelines

22 min
Data Engineering🔥 Expert

Building Your First Data Pipeline with Python

25 min

On this page

  • Prerequisites
  • Understanding Modern API Architectures
  • Advanced Pagination Strategies
  • Offset-Based Pagination
  • Cursor-Based Pagination
  • Time-Based Pagination
  • Sophisticated Rate Limiting Strategies
  • Token Bucket Implementation
  • Circuit Breaker Pattern
  • Authentication and Token Management
  • Data Consistency Patterns
  • Pagination Pitfalls
  • Rate Limiting Errors
  • Authentication Token Expiry
  • Memory Issues with Large Datasets
  • Debugging API Integration Issues
  • Summary & Next Steps
  • Next Steps
  • Performance Optimization Techniques
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Pagination Pitfalls
  • Rate Limiting Errors
  • Authentication Token Expiry
  • Memory Issues with Large Datasets
  • Debugging API Integration Issues
  • Summary & Next Steps
  • Next Steps