
You're running a data platform for a SaaS company that's grown from a dozen systems to over 50. Marketing uses HubSpot and Google Analytics, sales lives in Salesforce, customer success operates in Zendesk, and finance tracks everything in NetSuite. Your engineering team has built custom APIs for product analytics, and you've got log data streaming from Kafka. Each system has its own data format, API quirks, and update schedules.
The question isn't whether you need data ingestion—it's how to build a system that's reliable, maintainable, and scales with your growing data ecosystem. This is where understanding the full spectrum of ingestion options becomes critical: when to leverage managed platforms like Fivetran, when to deploy open-source solutions like Airbyte, and when you need to roll custom connectors.
By the end of this lesson, you'll understand how to architect a data ingestion strategy that balances cost, control, and complexity. You'll see how these tools work under the hood, understand their architectural trade-offs, and know when each approach makes sense.
What you'll learn:
You should be comfortable with REST APIs, database connections, and basic data pipeline concepts. Familiarity with Docker, Python, and SQL will help with the hands-on examples. Experience with at least one cloud data warehouse (Snowflake, BigQuery, or Redshift) is recommended.
Data ingestion has evolved far beyond simple ETL scripts. Modern organizations deal with hundreds of data sources, each with unique authentication schemes, rate limits, and data formats. The tools we'll explore represent three distinct architectural philosophies:
Fivetran represents the "managed complexity" approach. You pay premium prices for a service that handles the operational overhead of maintaining connectors, monitoring data freshness, and adapting to API changes. Their value proposition is operational simplicity at scale.
Airbyte embodies the "open-core" model. The core platform is open-source with a standardized connector protocol, but the company monetizes through managed cloud offerings and enterprise features. This gives you control over the platform while benefiting from community-driven connector development.
Custom connectors give you complete control but require you to handle all the complexity: authentication flows, error handling, schema mapping, and operational monitoring. The decision to build custom often comes down to unique requirements that existing connectors can't handle.
The key insight is that most mature data teams use all three approaches. You're not choosing one tool—you're architecting a system that uses the right approach for each data source based on criticality, volume, complexity, and cost.
Fivetran's architecture is built around the concept of "set it and forget it" data replication. When you configure a Fivetran connector, you're not just setting up a data pipeline—you're subscribing to a managed service that handles the operational complexity of keeping that pipeline running.
Every Fivetran connector follows a consistent pattern. The connector maintains a cursor state that tracks what data has been synced, handles incremental updates automatically, and manages schema changes without breaking downstream processes. This works because Fivetran controls both ends of the pipeline: they know the source system's API intimately and they control how data lands in your warehouse.
Let's examine how this works with a Salesforce connector:
-- Fivetran creates tables with standard patterns
SELECT
_fivetran_synced,
_fivetran_deleted,
id,
name,
industry,
created_date
FROM salesforce.account
WHERE _fivetran_synced > '2024-01-01'
The _fivetran_synced and _fivetran_deleted columns are metadata that Fivetran injects into every table. This metadata enables reliable incremental processing and soft deletes—two of the most common sources of bugs in custom-built pipelines.
One of Fivetran's most sophisticated features is automatic schema evolution. When Salesforce adds a new field to the Account object, Fivetran detects this change and automatically adds the corresponding column to your warehouse table. This sounds simple, but the implementation is complex.
Fivetran maintains a schema registry for each connector that maps source fields to destination columns. When schema changes occur, Fivetran can either auto-add columns (the default) or block new columns based on your configuration:
{
"schema_config": {
"accounts": {
"new_custom_field__c": {
"enabled": false,
"reason": "blocked_by_user"
}
}
}
}
This schema registry becomes critical when you're dealing with systems like Salesforce where administrators regularly add custom fields. Without proper schema evolution handling, these additions can break downstream processes or create data inconsistencies.
Fivetran's rate limiting implementation reveals the complexity hidden behind their simple interface. For each API connector, Fivetran maintains rate limit pools that are shared across all customers using that connector. This means Fivetran must balance individual customer sync speeds against aggregate API consumption.
For high-volume connectors like Google Analytics or Salesforce, Fivetran implements sophisticated queuing mechanisms. When you request a historical sync of two years of Salesforce data, Fivetran doesn't immediately hammer the Salesforce API. Instead, it breaks the request into time-bounded chunks and queues them across their infrastructure.
This is visible in the connector logs:
2024-01-15 10:30:22 [INFO] Salesforce sync queued: 2024-01-01 to 2024-01-07
2024-01-15 10:30:23 [INFO] Rate limit pool: 45% capacity remaining
2024-01-15 10:35:18 [INFO] Sync batch completed: 15,847 records processed
The rate limit management becomes a competitive moat. Fivetran has spent years optimizing their rate limit algorithms for each API, and these optimizations are not trivial to replicate in custom implementations.
Fivetran's pricing model is based on Monthly Active Rows (MAR), which sounds straightforward but can be tricky to optimize. A "monthly active row" is any row that's been inserted, updated, or deleted in a given month. This means that frequently updated records can generate significant costs.
Consider a real-world scenario with a customer support system:
-- A single support ticket might generate multiple MAR charges
SELECT
ticket_id,
status,
updated_at,
_fivetran_synced
FROM zendesk.tickets
WHERE ticket_id = 12345
ORDER BY _fivetran_synced;
-- Results show the same ticket updated multiple times:
-- 12345 | 'open' | '2024-01-15 09:00' | '2024-01-15 09:05'
-- 12345 | 'pending' | '2024-01-15 11:30' | '2024-01-15 11:35'
-- 12345 | 'solved' | '2024-01-15 14:15' | '2024-01-15 14:20'
Each update counts as a separate MAR, so this single ticket generated three MAR charges in one month. For high-touch systems like support platforms or CRMs, this can add up quickly.
The optimization strategy involves understanding your data update patterns and configuring sync frequencies accordingly. For systems with frequent but unimportant updates, you might sync only once daily instead of every hour.
Airbyte takes a fundamentally different approach to data ingestion. Instead of a managed service, Airbyte provides a platform that orchestrates connectors. This architectural choice has profound implications for how you deploy, monitor, and extend your data ingestion pipeline.
At its core, Airbyte defines a standard protocol for data connectors. Every Airbyte connector is a Docker container that accepts standardized inputs and produces standardized outputs. This protocol-based approach enables the connector ecosystem while maintaining platform consistency.
The Airbyte protocol defines three core operations:
# Discovery: What data can this connector provide?
docker run airbyte/source-postgres:latest discover \
--config config.json
# Check: Can we connect to the data source?
docker run airbyte/source-postgres:latest check \
--config config.json
# Read: Extract data according to the catalog
docker run airbyte/source-postgres:latest read \
--config config.json \
--catalog catalog.json \
--state state.json
The genius of this protocol is that it separates concerns. The Airbyte platform handles orchestration, monitoring, and state management, while connectors focus solely on data extraction. This separation enables a rich ecosystem of community-contributed connectors.
Airbyte's catalog system is more sophisticated than it initially appears. When you run discovery on a source, you get back a catalog that describes every available data stream:
{
"streams": [
{
"name": "users",
"json_schema": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"email": {"type": "string"},
"created_at": {"type": "string", "format": "date-time"}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["updated_at"],
"source_defined_primary_key": [["id"]]
}
]
}
This catalog becomes the contract between source and destination. You can selectively enable streams, choose sync modes, and configure cursor fields. The catalog-driven approach means you have fine-grained control over what data gets extracted.
For high-volume sources, this granular control becomes essential. Instead of syncing an entire database, you can sync only the tables and columns you need:
{
"streams": [
{
"stream": {
"name": "orders",
"json_schema": {...}
},
"config": {
"sync_mode": "incremental",
"cursor_field": ["updated_at"],
"destination_sync_mode": "append_dedup",
"selected": true
}
}
]
}
Airbyte's state management is one of its most critical features, though it's often underappreciated. The state object tracks what data has been synced for incremental connections:
{
"type": "STREAM",
"stream": {
"stream_descriptor": {
"name": "orders",
"namespace": "public"
},
"stream_state": {
"cursor": "2024-01-15T14:30:22Z",
"version": 1
}
}
}
This state gets updated continuously during sync runs. If a sync fails halfway through, Airbyte can resume from the last checkpoint rather than starting over. For large datasets, this checkpointing behavior is the difference between a resilient pipeline and one that fails repeatedly on transient issues.
The state management becomes complex when dealing with multiple streams or complex cursor scenarios. Consider a scenario where you're syncing orders and order_items tables:
{
"type": "GLOBAL",
"global": {
"shared_state": {
"orders": {"cursor": "2024-01-15T14:30:22Z"},
"order_items": {"cursor": "2024-01-15T14:28:15Z"}
}
}
}
Airbyte must coordinate state across multiple streams while handling dependencies between them. This coordination logic is non-trivial and represents significant value in the platform.
Building custom Airbyte connectors requires understanding the Connector Development Kit (CDK). The CDK provides Python and TypeScript frameworks that handle protocol compliance while letting you focus on data extraction logic.
Here's a simplified example of a custom HTTP API connector:
from airbyte_cdk.sources.abstract_source import AbstractSource
from airbyte_cdk.sources.streams.http.http import HttpStream
from airbyte_cdk.models import SyncMode
class CustomApiStream(HttpStream):
url_base = "https://api.example.com/"
primary_key = "id"
cursor_field = "updated_at"
def path(self, **kwargs) -> str:
return "users"
def parse_response(self, response, **kwargs):
data = response.json()
yield from data.get("results", [])
def get_updated_state(self, current_stream_state, latest_record):
current_cursor = current_stream_state.get(self.cursor_field, "")
latest_cursor = latest_record.get(self.cursor_field, "")
return {self.cursor_field: max(current_cursor, latest_cursor)}
class SourceCustomApi(AbstractSource):
def check_connection(self, logger, config):
try:
# Test API connectivity
return True, None
except Exception as e:
return False, str(e)
def streams(self, config):
return [CustomApiStream()]
This example shows how the CDK abstracts away protocol compliance while giving you hooks for the data extraction logic. The CDK handles state management, error recovery, and protocol formatting automatically.
Airbyte's performance characteristics differ significantly from Fivetran's. Because you control the deployment, you can optimize for your specific workloads. The key performance factors include:
Resource allocation controls how much CPU and memory each connector can use:
# docker-compose.yml
version: '3.8'
services:
airbyte-worker:
image: airbyte/worker:latest
environment:
- JOB_DEFAULT_ENV_MAP='{
"JAVA_OPTS": "-Xmx3g",
"RESOURCE_CPU_REQUEST": "2",
"RESOURCE_MEMORY_REQUEST": "4Gi"
}'
Parallelization can be configured per connector. Some connectors support parallel stream processing:
{
"connection_configuration": {
"parallelism": 4,
"batch_size": 10000
}
}
Destination optimization varies by warehouse. For Snowflake destinations, you can configure staging locations and copy strategies:
{
"destination_configuration": {
"loading_method": {
"method": "S3 Staging",
"s3_bucket_name": "your-staging-bucket",
"file_name_pattern": "{timestamp}_{part_number}",
"encryption": "AES256"
}
}
}
These optimizations require understanding your specific workload characteristics and warehouse capabilities.
Sometimes managed solutions and open-source platforms can't handle your requirements. Custom connectors become necessary when you're dealing with:
Modern APIs implement increasingly complex authentication schemes. OAuth 2.0 with PKCE, JWT tokens with refresh logic, and custom signature schemes all require careful implementation.
Here's a robust authentication handler for a custom connector:
import hmac
import hashlib
import time
from typing import Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class ApiAuthenticator:
def __init__(self, api_key: str, api_secret: str):
self.api_key = api_key
self.api_secret = api_secret
self.access_token: Optional[str] = None
self.token_expires_at: Optional[float] = None
self.session = self._create_session()
def _create_session(self) -> requests.Session:
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _generate_signature(self, method: str, path: str, body: str = "") -> str:
timestamp = str(int(time.time()))
message = f"{timestamp}{method}{path}{body}"
signature = hmac.new(
self.api_secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return timestamp, signature
def _refresh_token(self) -> None:
timestamp, signature = self._generate_signature("POST", "/auth/token")
headers = {
"API-Key": self.api_key,
"API-Timestamp": timestamp,
"API-Signature": signature,
"Content-Type": "application/json"
}
response = self.session.post(
"https://api.example.com/auth/token",
headers=headers,
json={"grant_type": "client_credentials"}
)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
self.token_expires_at = time.time() + data["expires_in"] - 300 # 5 min buffer
def get_authenticated_headers(self) -> dict:
if not self.access_token or time.time() >= self.token_expires_at:
self._refresh_token()
return {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
This authenticator handles token refresh, request signing, and retry logic—all common requirements for production API integrations.
Implementing reliable incremental sync is more complex than it appears. You need to handle clock skew, late-arriving data, and API pagination consistently:
from datetime import datetime, timedelta
from typing import Iterator, Dict, Any, Optional
import logging
class IncrementalSyncer:
def __init__(self, authenticator: ApiAuthenticator, lookback_window: int = 300):
self.auth = authenticator
self.lookback_window = lookback_window # seconds
self.logger = logging.getLogger(__name__)
def sync_incremental(
self,
endpoint: str,
cursor_field: str,
last_cursor_value: Optional[str] = None
) -> Iterator[Dict[str, Any]]:
# Apply lookback window to handle late-arriving data
if last_cursor_value:
cursor_datetime = datetime.fromisoformat(last_cursor_value.replace('Z', '+00:00'))
cursor_datetime = cursor_datetime - timedelta(seconds=self.lookback_window)
start_cursor = cursor_datetime.isoformat()
else:
start_cursor = None
page_token = None
max_cursor_seen = last_cursor_value
while True:
params = {"limit": 1000}
if start_cursor:
params["updated_after"] = start_cursor
if page_token:
params["page_token"] = page_token
response = self.auth.session.get(
f"https://api.example.com/{endpoint}",
headers=self.auth.get_authenticated_headers(),
params=params
)
response.raise_for_status()
data = response.json()
records = data.get("records", [])
if not records:
break
# Track maximum cursor value seen
for record in records:
cursor_value = record.get(cursor_field)
if cursor_value and (not max_cursor_seen or cursor_value > max_cursor_seen):
max_cursor_seen = cursor_value
yield record
# Handle pagination
page_token = data.get("next_page_token")
if not page_token:
break
# Update state with maximum cursor value
if max_cursor_seen:
self.update_state(cursor_field, max_cursor_seen)
def update_state(self, cursor_field: str, cursor_value: str) -> None:
# Implementation depends on your state storage mechanism
# This could write to a database, file, or external state store
self.logger.info(f"Updating cursor {cursor_field} to {cursor_value}")
The lookback window handles the common scenario where API responses are eventually consistent. Without this buffer, you might miss records that were updated after your last sync but before the API reflects those changes.
Production connectors need sophisticated error handling. Different types of errors require different responses:
import time
from enum import Enum
from typing import Optional
class ErrorType(Enum):
RATE_LIMIT = "rate_limit"
TEMPORARY = "temporary"
PERMANENT = "permanent"
AUTHENTICATION = "authentication"
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: int = 60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.state = "closed" # closed, open, half_open
def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time < self.timeout:
raise Exception("Circuit breaker is open")
else:
self.state = "half_open"
try:
result = func(*args, **kwargs)
if self.state == "half_open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise e
class ErrorHandler:
def __init__(self):
self.circuit_breaker = CircuitBreaker()
def classify_error(self, response: requests.Response) -> ErrorType:
if response.status_code == 429:
return ErrorType.RATE_LIMIT
elif response.status_code in [401, 403]:
return ErrorType.AUTHENTICATION
elif response.status_code >= 500:
return ErrorType.TEMPORARY
elif response.status_code >= 400:
return ErrorType.PERMANENT
else:
return ErrorType.TEMPORARY
def handle_error(self, response: requests.Response) -> None:
error_type = self.classify_error(response)
if error_type == ErrorType.RATE_LIMIT:
retry_after = int(response.headers.get("Retry-After", 60))
time.sleep(retry_after)
elif error_type == ErrorType.AUTHENTICATION:
# Force token refresh
raise Exception("Authentication failed - check credentials")
elif error_type == ErrorType.PERMANENT:
raise Exception(f"Permanent error: {response.status_code} {response.text}")
else:
# Temporary error - let circuit breaker handle it
raise Exception(f"Temporary error: {response.status_code}")
This error handling system distinguishes between different failure modes and responds appropriately to each.
Custom connectors often need to integrate with existing Airbyte or Fivetran deployments. For Airbyte integration, you can package your custom connector as a Docker image that implements the Airbyte protocol:
FROM python:3.9-slim
WORKDIR /airbyte/integration_code
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY source_custom_api ./source_custom_api
COPY main.py ./
COPY spec.json ./
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
The main.py file implements the Airbyte protocol:
import sys
import json
from source_custom_api import SourceCustomApi
if __name__ == "__main__":
source = SourceCustomApi()
if len(sys.argv) < 2:
print("Usage: python main.py <command>")
sys.exit(1)
command = sys.argv[1]
if command == "spec":
print(json.dumps(source.spec()))
elif command == "check":
config = json.loads(sys.stdin.read())
print(json.dumps(source.check(config)))
elif command == "discover":
config = json.loads(sys.stdin.read())
print(json.dumps(source.discover(config)))
elif command == "read":
config = json.loads(sys.stdin.read())
print(json.dumps(source.read(config)))
else:
print(f"Unknown command: {command}")
sys.exit(1)
For Fivetran integration, you can use Fivetran's connector development framework or leverage their REST API connector for simpler use cases.
Regardless of which ingestion approach you choose, performance optimization requires understanding the bottlenecks in your specific pipeline. Data ingestion performance is typically constrained by one of four factors: source API rate limits, network bandwidth, destination write throughput, or transformation complexity.
The first step in optimization is measurement. Each platform provides different observability tools:
Fivetran provides detailed sync logs and performance metrics through their dashboard. Key metrics to monitor include:
-- Query Fivetran's log tables to identify slow syncs
SELECT
connector_name,
table_name,
sync_start,
sync_end,
rows_updated_or_inserted as row_count,
(EXTRACT(EPOCH FROM sync_end - sync_start) / 60) as duration_minutes,
(rows_updated_or_inserted / EXTRACT(EPOCH FROM sync_end - sync_start)) as rows_per_second
FROM fivetran_log.sync_log
WHERE sync_start >= CURRENT_DATE - 7
ORDER BY duration_minutes DESC
LIMIT 20;
Airbyte exposes metrics through its API and logs. You can build monitoring dashboards using this data:
import requests
from datetime import datetime, timedelta
def get_sync_performance(connection_id: str, days: int = 7) -> dict:
since = datetime.now() - timedelta(days=days)
response = requests.get(
f"http://localhost:8001/api/v1/jobs/list",
json={
"configTypes": ["sync"],
"configId": connection_id,
"createdAtStart": since.isoformat(),
"pagination": {"pageSize": 100}
}
)
jobs = response.json()["jobs"]
performance_data = []
for job in jobs:
if job["job"]["status"] == "succeeded":
summary = job["job"]["logsSummary"]
performance_data.append({
"job_id": job["job"]["id"],
"duration": job["job"]["updatedAt"] - job["job"]["createdAt"],
"records_synced": summary.get("bytesSynced", 0),
"records_emitted": summary.get("recordsEmitted", 0)
})
return performance_data
API rate limiting is often the primary constraint in data ingestion. Different APIs have different rate limiting schemes:
Token bucket algorithms allow bursts up to a limit, then throttle requests. These work well with batch processing:
import time
from collections import deque
class TokenBucket:
def __init__(self, rate: float, burst_size: int):
self.rate = rate # tokens per second
self.burst_size = burst_size
self.tokens = burst_size
self.last_update = time.time()
def consume(self, tokens: int = 1) -> bool:
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.burst_size, self.tokens + elapsed * self.rate)
self.last_update = now
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_time(self, tokens: int = 1) -> float:
if self.tokens >= tokens:
return 0
return (tokens - self.tokens) / self.rate
# Usage in connector
rate_limiter = TokenBucket(rate=10, burst_size=100) # 10 requests/sec, burst to 100
def make_api_request(url: str) -> requests.Response:
if not rate_limiter.consume():
wait_time = rate_limiter.wait_time()
time.sleep(wait_time)
return requests.get(url, headers=auth_headers)
Fixed window rate limits reset at regular intervals. These require careful timing:
class FixedWindowRateLimiter:
def __init__(self, requests_per_window: int, window_seconds: int):
self.requests_per_window = requests_per_window
self.window_seconds = window_seconds
self.requests_made = 0
self.window_start = time.time()
def wait_if_needed(self) -> None:
current_time = time.time()
# Reset window if needed
if current_time - self.window_start >= self.window_seconds:
self.requests_made = 0
self.window_start = current_time
# Wait if we've hit the limit
if self.requests_made >= self.requests_per_window:
sleep_time = self.window_seconds - (current_time - self.window_start)
if sleep_time > 0:
time.sleep(sleep_time)
self.requests_made = 0
self.window_start = time.time()
self.requests_made += 1
Warehouse write performance varies significantly based on file sizes, compression, and concurrent connections. Each warehouse has optimal patterns:
Snowflake performs best with larger files (100MB+) and fewer concurrent operations:
-- Optimize Snowflake loading with proper staging
COPY INTO raw_data.events
FROM (
SELECT
$1:timestamp::timestamp_ntz as event_time,
$1:user_id::varchar as user_id,
$1:event_type::varchar as event_type,
parse_json($1:properties) as properties,
metadata$filename as source_file,
metadata$file_row_number as row_number
FROM @external_stage/events/
)
FILE_FORMAT = (TYPE = 'JSON', COMPRESSION = 'GZIP')
PATTERN = '.*events_[0-9]{8}_[0-9]{6}\.json\.gz'
ON_ERROR = 'SKIP_FILE';
BigQuery handles streaming inserts differently than batch loads:
from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig, WriteDisposition
def optimized_bigquery_load(
dataset_id: str,
table_id: str,
source_uris: list,
schema: list
) -> None:
client = bigquery.Client()
table_ref = client.dataset(dataset_id).table(table_id)
job_config = LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=WriteDisposition.WRITE_APPEND,
# Optimize for throughput
max_bad_records=1000,
ignore_unknown_values=True,
clustering_fields=["event_date", "user_id"]
)
load_job = client.load_table_from_uri(
source_uris,
table_ref,
job_config=job_config
)
load_job.result() # Wait for completion
Production data ingestion requires comprehensive monitoring. Key metrics to track include:
import logging
from datadog import initialize, statsd
from typing import Dict, Any
class IngestionMonitor:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(__name__)
initialize(statsd_host='localhost', statsd_port=8125)
def record_sync_metrics(self, sync_result: Dict[str, Any]) -> None:
tags = [
f"connector:{sync_result['connector_name']}",
f"table:{sync_result['table_name']}",
f"status:{sync_result['status']}"
]
# Record throughput
if sync_result['status'] == 'success':
statsd.histogram(
'ingestion.records_per_second',
sync_result['records_per_second'],
tags=tags
)
statsd.histogram(
'ingestion.duration_minutes',
sync_result['duration_minutes'],
tags=tags
)
# Record errors
if sync_result['status'] == 'error':
statsd.increment('ingestion.errors', tags=tags)
self.logger.error(
f"Sync failed: {sync_result['error_message']}",
extra={"connector": sync_result['connector_name']}
)
def check_data_freshness(self, table_name: str, max_age_hours: int = 25) -> None:
# Implementation would query the warehouse for data freshness
# and alert if data is stale
pass
Alerting should distinguish between different types of failures:
Let's build a complete data ingestion pipeline that demonstrates all three approaches. We'll ingest data from a fictional e-commerce API that provides order data, customer data, and real-time events.
First, we'll set up a local environment with Airbyte and a destination warehouse:
# Clone Airbyte
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
# Start Airbyte services
docker-compose up -d
# Verify services are running
curl http://localhost:8000/api/v1/health
Set up a local Postgres instance as our destination:
# Start Postgres
docker run -d \
--name postgres-destination \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=warehouse \
-p 5432:5432 \
postgres:13
# Connect and create schemas
psql -h localhost -U postgres -d warehouse << EOF
CREATE SCHEMA raw_data;
CREATE SCHEMA analytics;
GRANT ALL ON SCHEMA raw_data TO postgres;
GRANT ALL ON SCHEMA analytics TO postgres;
EOF
Configure an Airbyte connection for a PostgreSQL source (simulating an operational database):
import requests
import json
# Create source configuration
source_config = {
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750", # Postgres
"connectionConfiguration": {
"host": "source-postgres",
"port": 5432,
"database": "ecommerce",
"username": "readonly_user",
"password": "secure_password",
"schemas": ["public"],
"ssl": False
},
"name": "Ecommerce DB Source"
}
response = requests.post(
"http://localhost:8001/api/v1/sources/create",
json=source_config
)
source_id = response.json()["sourceId"]
# Create destination
destination_config = {
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503", # Postgres
"connectionConfiguration": {
"host": "postgres-destination",
"port": 5432,
"database": "warehouse",
"username": "postgres",
"password": "password",
"schema": "raw_data"
},
"name": "Data Warehouse"
}
response = requests.post(
"http://localhost:8001/api/v1/destinations/create",
json=destination_config
)
destination_id = response.json()["destinationId"]
# Run discovery
discovery_result = requests.post(
f"http://localhost:8001/api/v1/sources/discover_schema",
json={"sourceId": source_id}
)
catalog = discovery_result.json()["catalog"]
Create a custom connector for a REST API that provides real-time order events:
# custom_ecommerce_connector.py
import json
import sys
from datetime import datetime
from typing import Dict, Any, Iterator
import requests
class EcommerceApiConnector:
def __init__(self, config: Dict[str, Any]):
self.api_key = config["api_key"]
self.base_url = config["base_url"]
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
})
def spec(self) -> Dict[str, Any]:
return {
"documentationUrl": "https://docs.example-ecommerce.com/api",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "E-commerce API Spec",
"type": "object",
"required": ["api_key", "base_url"],
"properties": {
"api_key": {
"type": "string",
"description": "API key for authentication",
"airbyte_secret": True
},
"base_url": {
"type": "string",
"description": "Base URL for the API",
"default": "https://api.example-ecommerce.com"
}
}
}
}
def check(self, config: Dict[str, Any]) -> Dict[str, Any]:
try:
connector = EcommerceApiConnector(config)
response = connector.session.get(f"{connector.base_url}/health")
response.raise_for_status()
return {"status": "SUCCEEDED"}
except Exception as e:
return {"status": "FAILED", "message": str(e)}
def discover(self, config: Dict[str, Any]) -> Dict[str, Any]:
return {
"streams": [
{
"name": "orders",
"json_schema": {
"type": "object",
"properties": {
"id": {"type": "string"},
"customer_id": {"type": "string"},
"total_amount": {"type": "number"},
"status": {"type": "string"},
"created_at": {"type": "string", "format": "date-time"},
"updated_at": {"type": "string", "format": "date-time"}
}
},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": True,
"default_cursor_field": ["updated_at"],
"source_defined_primary_key": [["id"]]
},
{
"name": "order_events",
"json_schema": {
"type": "object",
"properties": {
"event_id": {"type": "string"},
"order_id": {"type": "string"},
"event_type": {"type": "string"},
"timestamp": {"type": "string", "format": "date-time"},
"properties": {"type": "object"}
}
},
"supported_sync_modes": ["incremental"],
"source_defined_cursor": True,
"default_cursor_field": ["timestamp"],
"source_defined_primary_key": [["event_id"]]
}
]
}
def read_orders(self, config: Dict[str, Any], state: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
connector = EcommerceApiConnector(config)
cursor_value = state.get("orders", {}).get("updated_at")
params = {"limit": 1000}
if cursor_value:
params["updated_after"] = cursor_value
page = 1
max_cursor_seen = cursor_value
while True:
params["page"] = page
response = connector.session.get(
f"{connector.base_url}/orders",
params=params
)
response.raise_for_status()
data = response.json()
orders = data.get("orders", [])
if not orders:
break
for order in orders:
updated_at = order["updated_at"]
if not max_cursor_seen or updated_at > max_cursor_seen:
max_cursor_seen = updated_at
yield {
"type": "RECORD",
"record": {
"stream": "orders",
"data": order,
"emitted_at": int(datetime.now().timestamp() * 1000)
}
}
if not data.get("has_more", False):
break
page += 1
# Emit final state
if max_cursor_seen:
yield {
"type": "STATE",
"state": {
"data": {
"orders": {"updated_at": max_cursor_seen}
}
}
}
def main():
command = sys.argv[1] if len(sys.argv) > 1 else None
if command == "spec":
connector = EcommerceApiConnector({})
print(json.dumps(connector.spec()))
elif command == "check":
config = json.loads(sys.stdin.read())
connector = EcommerceApiConnector({})
result = connector.check(config)
print(json.dumps(result))
elif command == "discover":
config = json.loads(sys.stdin.read())
connector = EcommerceApiConnector({})
result = connector.discover(config)
print(json.dumps(result))
elif command == "read":
config_and_catalog = json.loads(sys.stdin.read())
config = config_and_catalog["config"]
state = config_and_catalog.get("state", {})
connector = EcommerceApiConnector(config)
for message in connector.read_orders(config, state):
print(json.dumps(message))
if __name__ == "__main__":
main()
Package this as a Docker container:
FROM python:3.9-slim
WORKDIR /airbyte/integration_code
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY custom_ecommerce_connector.py ./
ENTRYPOINT ["python", "custom_ecommerce_connector.py"]
For real-time data ingestion, implement a streaming connector that pushes events directly to the warehouse:
# streaming_processor.py
import asyncio
import json
from datetime import datetime
import aiohttp
import asyncpg
from typing import Dict, Any
class RealTimeEventProcessor:
def __init__(self, webhook_port: int, db_connection_string: str):
self.webhook_port = webhook_port
self.db_connection_string = db_connection_string
self.db_pool = None
async def init_db_pool(self):
self.db_pool = await asyncpg.create_pool(
self.db_connection_string,
min_size=5,
max_size=20
)
async def process_webhook_event(self, request):
try:
event_data = await request.json()
# Validate event structure
required_fields = ["event_id", "event_type", "timestamp", "order_id"]
if not all(field in event_data for field in required_fields):
return aiohttp.web.Response(status=400, text="Missing required fields")
# Enrich event with processing metadata
enriched_event = {
**event_data,
"processed_at": datetime.utcnow().isoformat(),
"source": "webhook"
}
# Insert into warehouse
await self.insert_event(enriched_event)
return aiohttp.web.Response(status=200, text="Event processed")
except Exception as e:
print(f"Error processing webhook: {e}")
return aiohttp.web.Response(status=500, text="Internal error")
async def insert_event(self, event_data: Dict[str, Any]):
query = """
INSERT INTO raw_data.order_events (
event_id, event_type, order_id, timestamp,
properties, processed_at, source
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (event_id) DO NOTHING
"""
async with self.db_pool.acquire() as connection:
await connection.execute(
query,
event_data["event_id"],
event_data["event_type"],
event_data["order_id"],
event_data["timestamp"],
json.dumps(event_data.get("properties", {})),
event_data["processed_at"],
event_data["source"]
)
async def start_server(self):
await self.init_db_pool()
app = aiohttp.web.Application()
app.router.add_post('/webhook/events', self.process_webhook_event)
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, 'localhost', self.webhook_port)
await site.start()
print(f"Webhook server started on port {self.webhook_port}")
# Usage
async def main():
processor = RealTimeEventProcessor(
webhook_port=8080,
db_connection_string="postgresql://postgres:password@localhost/warehouse"
)
await processor.start_server()
# Keep server running
while True:
await asyncio.sleep(3600)
if __name__ == "__main__":
asyncio.run(main())
Implement comprehensive monitoring for your ingestion pipeline:
# monitoring.py
import asyncio
import asyncpg
from datetime import datetime, timedelta
from typing import Dict, List
import logging
class IngestionMonitor:
def __init__(self, db_connection_string: str):
self.db_connection_string = db_connection_string
self.logger = logging.getLogger(__name__)
async def check_data_freshness(self) -> List[Dict[str, Any]]:
"""Check if data in each table is fresh enough"""
freshness_checks = []
query = """
SELECT
schemaname,
tablename,
MAX(CASE
WHEN columnname LIKE '%updated_at%' THEN columnname
WHEN columnname LIKE '%created_at%' THEN columnname
WHEN columnname LIKE '%timestamp%' THEN columnname
ELSE NULL
END) as timestamp_column
FROM pg_stats
WHERE schemaname = 'raw_data'
GROUP BY schemaname, tablename
HAVING MAX(CASE
WHEN columnname LIKE '%updated_at%' THEN columnname
WHEN columnname LIKE '%created_at%' THEN columnname
WHEN columnname LIKE '%timestamp%' THEN columnname
ELSE NULL
END) IS NOT NULL
"""
conn = await asyncpg.connect(self.db_connection_string)
try:
tables = await conn.fetch(query)
for table in tables:
schema = table['schemaname']
table_name = table['tablename']
timestamp_col = table['timestamp_column']
freshness_query = f"""
SELECT
'{table_name}' as table_name,
MAX({timestamp_col}) as latest_timestamp,
COUNT(*) as total_rows,
COUNT(*) FILTER (WHERE {timestamp_col} > NOW() - INTERVAL '1 hour') as recent_rows
FROM {schema}.{table_name}
"""
result = await conn.fetchrow(freshness_query)
hours_since_update = None
if result['latest_timestamp']:
hours_since_update = (
datetime.now() - result['latest_timestamp'].replace(tzinfo=None)
).total_seconds() / 3600
freshness_checks.append({
'table': table_name,
'latest_timestamp': result['latest_timestamp'],
'hours_since_update': hours_since_update,
'total_rows': result['total_rows'],
'recent_rows': result['recent_rows'],
'is_fresh': hours_since_update < 25 if hours_since_update else False
})
finally:
await conn.close()
return freshness_checks
async def check_volume_anomalies(self) -> List[Dict[str, Any]]:
"""Detect significant changes in daily record volumes"""
volume_checks = []
query = """
WITH daily_counts AS (
SELECT
'orders' as table_name,
DATE(created_at) as date,
COUNT(*) as record_count
FROM raw_data.orders
WHERE created_at >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY DATE(created_at)
),
volume_stats AS (
SELECT
table_name,
date,
record_count,
AVG(record_count) OVER (
PARTITION BY table_name
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND 1 PRECEDING
) as avg_volume,
STDDEV(record_count) OVER (
PARTITION BY table_name
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND 1 PRECEDING
) as stddev_volume
FROM daily_counts
)
SELECT
table_name,
date,
record_count,
avg_volume,
stddev_volume,
CASE
WHEN stddev_volume > 0 THEN
ABS(record_count - avg_volume) / stddev_volume
ELSE 0
END as z_score
FROM volume_stats
WHERE date = CURRENT_DATE - INTERVAL '1 day'
"""
conn = await asyncpg.connect(self.db_connection_string)
try:
results = await conn.fetch(query)
for result in results:
is_anomaly = result['z_score'] > 2 # 2 standard deviations
volume_checks.append({
'table': result['table_name'],
'date': result['date'],
'record_count': result['record_count'],
'avg_volume': float(result['avg_volume'] or 0),
'z_score': float(result['z_score']),
'is_anomaly': is_anomaly
})
finally:
await conn.close()
return volume_checks
async def run_monitoring_cycle(self):
"""Run complete monitoring cycle"""
self.logger.info("Starting monitoring cycle")
# Check data freshness
freshness_results = await self.check_data_freshness()
stale_tables = [r for r in freshness_results if not r['is_fresh']]
if stale_tables:
self.logger.warning(f"Stale data detected in {len(stale_tables)} tables")
for table in stale_tables:
self.logger.warning(
f"Table {table['table']} last updated "
f"{table['hours_since_update']:.1f} hours ago"
)
# Check volume anomalies
volume_results = await self.check_volume_anomalies()
anomalies = [r for r in volume_results if r['is_anomaly']]
if anomalies:
self.logger.warning(f"Volume anomalies detected in {len(anomalies)} tables")
for anomaly in anomalies:
self.logger.warning(
f"Table {anomaly['table']} had {anomaly['record_count']} records "
f"(Z-score: {anomaly['z_score']:.2f})"
)
return {
'freshness_results': freshness_results,
'volume_results': volume_results,
'alerts': {
'stale_tables': len(stale_tables),
'volume_anomalies': len(anomalies)
}
}
# Usage
async def main():
monitor = IngestionMonitor("postgresql://postgres:password@localhost/warehouse")
while True:
try:
results = await monitor.run_monitoring_cycle()
print(f"Monitoring complete: {results['alerts']}")
await asyncio.sleep(3600) # Check every hour
except Exception as e:
logging.error(f"Monitoring error: {e}")
await asyncio.sleep(300) # Retry in 5 minutes
if __name__ == "__main__":
asyncio.run(main())
Understanding what goes wrong in data ingestion pipelines is as important as knowing how to build them. Here are the most common issues and their solutions:
The Problem: Source systems add or remove fields without warning, breaking downstream processes.
-- This query worked yesterday but fails today
SELECT
order_id,
customer_id,
total_amount,
shipping_method -- Column was removed from source
FROM raw_data.orders
WHERE created_at > '2024-01-01';
-- Error: column "shipping_method" does not exist
The Solution: Implement defensive SQL patterns and schema monitoring:
-- Use INFORMATION_SCHEMA to check column existence
WITH table_columns AS (
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'raw_data'
AND table_name = 'orders'
)
SELECT
order_id,
customer_id,
total_amount,
CASE WHEN EXISTS(SELECT 1 FROM table_columns WHERE column_name = 'shipping_method')
THEN shipping_method
ELSE 'unknown'
END as shipping_method
FROM raw_data.orders;
For Airbyte connectors, enable schema refresh monitoring:
def monitor_schema_changes(connection_id: str):
# Compare current catalog with previous version
current_catalog = get_connection_catalog(connection_id)
previous_catalog = load_catalog_from_storage(connection_id)
schema_changes = []
for stream in current_catalog['streams']:
stream_name = stream['name']
current_props = stream['json_schema']['properties']
if stream_name in previous_catalog:
previous_props = previous_catalog[stream_name]['properties']
# Detect new fields
new_fields = set(current_props.keys()) - set(previous_props.keys())
removed_fields = set(previous_props.keys()) - set(current_props.keys())
if new_fields or removed_fields:
schema_changes.append({
'stream': stream_name,
'new_fields': list(new_fields),
'removed_fields': list(removed_fields)
})
return schema_changes
The Problem: Incremental syncs miss data due to clock skew, late-arriving records, or cursor field issues.
# Problematic cursor logic
def sync_incremental_naive(last_cursor_value: str):
# This misses records updated between sync runs
params = {"updated_after": last_cursor_value}
response = requests.get("https://api.example.com/orders", params=params)
for record in response.json()["orders"]:
yield record
The Solution: Implement overlapping windows and idempotent processing:
def sync_incremental_robust(last_cursor_value: str, overlap_minutes: int = 5):
# Apply overlap window to catch late-arriving data
if last_cursor_value:
cursor_dt = datetime.fromisoformat(last_cursor_value)
adjusted_cursor = cursor_dt - timedelta(minutes=overlap_minutes)
start_cursor = adjusted_cursor.isoformat()
else:
start_cursor = None
# Track seen record IDs to handle duplicates
seen_records = set()
params = {"updated_after": start_cursor, "limit": 1000}
while True:
response = requests.get("https://api.example.com/orders", params=params)
orders = response.json()["orders"]
if not orders:
break
for order in orders:
record_id = order["id"]
# Skip duplicates from overlap window
if record_id in seen_records:
continue
seen_records.add(record_id)
yield order
# Handle pagination
next_cursor = response.json().get("next_cursor")
if not next_cursor:
break
params["cursor"] = next_cursor
The Problem: Hitting API rate limits without proper backoff causes sync failures and potential IP blocking.
# Problematic approach - no rate limiting
def fetch_all_data():
for page in range(1, 1000): # Will likely hit rate limits
response = requests.get(f"https://api.example.com/data?page={page}")
if response.status_code == 429:
print("Rate limited!") # But continues anyway
continue
yield response.json()
The Solution: Implement proper rate limiting with exponential backoff:
import time
import random
from typing import Optional
def fetch_with_rate_limiting(url: str, max_retries: int = 5) -> Optional[requests.Response]:
for attempt in range(max_retries):
response = requests.get(url)
if response.status_code == 200:
return response
elif response.status_code == 429:
# Check for Retry-After header
retry_after = response.headers.get('Retry-After')
if retry_after:
sleep_time = int(retry_after)
else:
# Exponential backoff with jitter
sleep_time = (2 ** attempt) + random.uniform(0, 1)
logging.info(f"Rate limited. Sleeping {sleep_time} seconds")
time.sleep(sleep_time)
elif response.status_code >= 500:
# Server error - retry with backoff
sleep_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(sleep_time)
else:
# Client error - don't retry
response.raise_for_status()
raise Exception(f"Max retries ({max_retries}) exceeded")
The Problem: Loading entire datasets into memory causes out-of-memory errors.
# Memory-intensive approach
def sync_large_table():
response = requests.get("https://api.example.com/all_orders")
all_orders = response.json()["orders"] # Could be millions of records
# Process all records in memory
processed_orders = [transform_order(order) for order in all_orders]
# Bulk insert
insert_orders(processed_orders)
The Solution: Use streaming processing with batching:
def sync_large_table_streaming(batch_size: int = 1000):
batch = []
for order in fetch_orders_streaming(): # Generator function
batch.append(transform_order(order))
if len(batch) >= batch_size:
insert_orders_batch(batch)
batch = []
# Insert remaining records
if batch:
insert_orders_batch(batch)
def fetch_orders_streaming():
"""Generator that yields orders one at a time"""
page = 1
while True:
response = requests.get(
f"https://api.example.com/orders?page={page}&limit=100"
)
orders = response.json()["orders"]
if not orders:
break
for order in orders:
yield order
page += 1
The Problem: Opening new database connections for each batch insert creates connection overhead and potential connection exhaustion.
# Inefficient connection handling
def insert_batch_inefficient(records: list):
# New connection for each batch
conn = psycopg2.connect(connection_string)
cursor = conn.cursor()
for record in records:
cursor.execute(insert_query, record)
conn.commit()
conn.close() # Connection overhead on each batch
The Solution: Use connection pooling and prepared statements:
from psycopg2 import pool
import threading
class ConnectionManager:
_instance = None
_lock = threading.Lock()
def __new__(cls, connection_string: str, min_conn: int = 5, max_conn: int = 20):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.pool = psycopg2.pool.ThreadedConnectionPool(
min_conn, max_conn, connection_string
)
return cls._instance
def get_connection(self):
return self.pool.getconn()
def put_connection(self, conn):
self.pool.putconn(conn)
def insert_batch_efficient(records: list):
conn_manager = ConnectionManager(connection_string)
conn = conn_manager.get_connection()
try:
cursor = conn.cursor()
# Use bulk insert for better performance
insert_query = """
INSERT INTO raw_data.orders (id, customer_id, total_amount, created_at)
VALUES %s
ON CONFLICT (id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
updated_at = CURRENT_TIMESTAMP
"""
psycopg2.extras.execute_values(
cursor, insert_query, records, template=None, page_size=1000
)
conn.commit()
finally:
conn_manager.put_connection(conn)
Data ingestion is the foundation of modern data platforms, but the choice between Fivetran, Airbyte, and custom connectors isn't binary—it's architectural. Successful data teams understand that different data sources have different requirements and build hybrid systems that use the right approach for each use case.
**
Learning Path: Modern Data Stack