Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Building a Complete Modern Data Stack from Scratch

Building a Complete Modern Data Stack from Scratch

Data Engineering🔥 Expert26 min readApr 24, 2026Updated Apr 24, 2026
Table of Contents
  • Prerequisites
  • Architecture Design and Technology Selection
  • Infrastructure Foundation with Terraform
  • Real-Time Data Ingestion with Kafka
  • Batch Data Integration with Airbyte
  • Data Transformation with dbt
  • Real-Time Analytics with Snowflake
  • Data Governance and Quality Monitoring
  • Workflow Orchestration with Airflow

Building a Complete Modern Data Stack from Scratch

You're standing in the conference room, whiteboard covered in boxes and arrows, as your CEO asks the question that makes every data engineer's pulse quicken: "How do we build a data platform that can handle everything we throw at it for the next five years?" The marketing team wants real-time customer journey analytics. Finance needs daily P&L reports with zero downtime. Product wants to A/B test features with millisecond precision. Engineering wants to ship ML models that actually work in production.

The modern data stack isn't just about choosing the right tools—it's about architecting a system that can evolve from handling gigabytes to petabytes while maintaining reliability, governance, and cost efficiency. This lesson will guide you through building a production-ready data platform from the ground up, making architectural decisions that matter, and implementing patterns that scale.

By the end of this deep dive, you'll have hands-on experience with every layer of a modern data stack and understand the subtle engineering decisions that separate functional systems from exceptional ones.

What you'll learn:

  • Design and implement a cloud-native data architecture that handles streaming and batch workloads
  • Build robust data ingestion pipelines with proper error handling, monitoring, and backpressure management
  • Implement data transformation workflows using dbt with advanced testing, documentation, and deployment strategies
  • Set up real-time analytics with proper partitioning, indexing, and query optimization
  • Establish data governance, lineage, and quality monitoring across the entire pipeline
  • Deploy infrastructure as code with proper secrets management and environment promotion
  • Design cost optimization strategies that scale with data volume

Prerequisites

This is an expert-level lesson requiring solid experience with:

  • SQL and Python programming
  • Cloud platforms (AWS/GCP/Azure fundamentals)
  • Basic understanding of data warehousing concepts
  • Command-line proficiency and Git workflows
  • Docker containerization principles

Architecture Design and Technology Selection

Before writing a single line of code, we need to make foundational architectural decisions. The modern data stack typically follows a medallion architecture with these core layers:

Ingestion Layer: Handles data collection from various sources with proper schema evolution and error recovery. We'll use Apache Kafka for streaming data and Airbyte for batch extraction, chosen for their robust connector ecosystems and operational maturity.

Storage Layer: Raw data lands in object storage (S3), with processed data in a cloud data warehouse. We're selecting Snowflake for its separation of compute and storage, automatic scaling, and mature ecosystem integration.

Transformation Layer: dbt handles all data transformations, chosen for its software engineering best practices, testing framework, and deployment capabilities.

Serving Layer: Multiple consumption patterns require different tools. We'll implement Metabase for self-service analytics, Apache Superset for advanced visualization, and direct SQL access for data scientists.

Orchestration Layer: Airflow manages workflow dependencies and monitoring, selected for its flexibility and extensive operator library.

Here's our target architecture with realistic data flows:

# architecture-overview.yaml
ingestion:
  streaming:
    - kafka_cluster: 3_brokers_multi_az
    - topics: 
        user_events: 10_partitions
        transaction_events: 20_partitions
        system_metrics: 5_partitions
  batch:
    - airbyte_connectors:
        salesforce: daily_full_refresh
        postgres_replica: hourly_incremental
        stripe_api: hourly_incremental

storage:
  raw_layer: s3://company-data-lake/raw/
  processed_layer: snowflake.analytics_db
  
transformation:
  tool: dbt_cloud
  models: 200+_tables
  tests: 500+_data_quality_checks
  
serving:
  dashboards: metabase_cloud
  adhoc_queries: snowflake_worksheets
  ml_features: feature_store_api

The critical architectural decision here is embracing the ELT pattern over ETL. Raw data lands in the lake with minimal transformation, then gets processed in the warehouse. This provides maximum flexibility for future use cases and takes advantage of modern compute elasticity.

Infrastructure Foundation with Terraform

Modern data platforms require infrastructure as code for repeatability and disaster recovery. We'll build our foundation using Terraform with proper state management and secrets handling.

First, let's establish our Terraform structure with environment separation:

# terraform/environments/prod/main.tf
terraform {
  required_version = ">= 1.0"
  backend "s3" {
    bucket         = "company-terraform-state"
    key            = "data-platform/prod/terraform.tfstate"
    region         = "us-west-2"
    encrypt        = true
    dynamodb_table = "terraform-state-lock"
  }
  
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    snowflake = {
      source  = "Snowflake-Labs/snowflake"
      version = "~> 0.87"
    }
  }
}

module "data_platform" {
  source = "../../modules/data-platform"
  
  environment = "prod"
  
  # Kafka configuration
  kafka_instance_type = "kafka.m5.large"
  kafka_broker_count  = 3
  kafka_storage_size  = 1000
  
  # Snowflake configuration  
  snowflake_warehouse_size = "X-LARGE"
  snowflake_auto_suspend   = 60
  snowflake_auto_resume    = true
  
  # S3 configuration
  data_lake_retention_days = 2555  # 7 years
  
  tags = {
    Environment = "prod"
    Platform    = "data"
    Owner       = "data-engineering"
  }
}

The core infrastructure module handles the complex networking and security requirements:

# terraform/modules/data-platform/kafka.tf
resource "aws_msk_cluster" "data_platform" {
  cluster_name           = "${var.environment}-data-kafka"
  kafka_version         = "2.8.1"
  number_of_broker_nodes = var.kafka_broker_count

  broker_node_group_info {
    instance_type   = var.kafka_instance_type
    ebs_volume_size = var.kafka_storage_size
    client_subnets  = aws_subnet.private[*].id
    
    security_groups = [aws_security_group.kafka.id]
  }

  configuration_info {
    arn      = aws_msk_configuration.kafka_config.arn
    revision = aws_msk_configuration.kafka_config.latest_revision
  }

  encryption_info {
    encryption_in_transit {
      client_broker = "TLS"
      in_cluster    = true
    }
    encryption_at_rest_kms_key_id = aws_kms_key.kafka.arn
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = true
        log_group = aws_cloudwatch_log_group.kafka.name
      }
    }
  }

  tags = var.tags
}

resource "aws_msk_configuration" "kafka_config" {
  kafka_versions = ["2.8.1"]
  name           = "${var.environment}-kafka-config"

  server_properties = <<PROPERTIES
auto.create.topics.enable=false
delete.topic.enable=true
default.replication.factor=3
min.insync.replicas=2
num.partitions=10
log.retention.hours=168
log.segment.bytes=104857600
log.retention.check.interval.ms=300000
compression.type=snappy
PROPERTIES
}

Critical infrastructure considerations include:

Security: All data in transit uses TLS encryption. Data at rest uses customer-managed KMS keys with proper key rotation. Network access follows least-privilege with security groups restricting traffic to necessary ports.

High Availability: Kafka brokers span multiple availability zones. Snowflake provides built-in redundancy. S3 offers 99.999999999% durability.

Monitoring: CloudWatch integration captures infrastructure metrics. Custom alarms trigger on broker failures, unusual traffic patterns, and storage thresholds.

Cost Optimization: Auto-suspend Snowflake warehouses after 60 seconds of inactivity. S3 lifecycle policies move older data to cheaper storage classes.

Real-Time Data Ingestion with Kafka

Streaming data ingestion requires careful consideration of throughput, latency, and fault tolerance. Our Kafka implementation handles multiple data sources with different characteristics.

Let's implement a robust producer for user event data:

# kafka_producers/user_events.py
import json
import logging
from typing import Dict, Any, Optional
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
from datetime import datetime
import hashlib

class UserEventProducer:
    def __init__(self, bootstrap_servers: str, 
                 security_protocol: str = "SSL",
                 compression_type: str = "snappy"):
        self.logger = logging.getLogger(__name__)
        
        producer_config = {
            'bootstrap_servers': bootstrap_servers,
            'security_protocol': security_protocol,
            'compression_type': compression_type,
            'acks': 'all',  # Wait for all replicas
            'retries': 10,
            'retry_backoff_ms': 300,
            'batch_size': 16384,
            'linger_ms': 5,  # Small batching delay
            'buffer_memory': 33554432,
            'max_in_flight_requests_per_connection': 5,
            'value_serializer': lambda x: json.dumps(x).encode('utf-8'),
            'key_serializer': lambda x: x.encode('utf-8') if x else None
        }
        
        self.producer = KafkaProducer(**producer_config)
        self.topic = "user_events"
        
    def produce_event(self, user_id: str, event_type: str, 
                     event_data: Dict[str, Any]) -> bool:
        """
        Produces a user event with proper partitioning and error handling
        """
        try:
            # Create standardized event payload
            event_payload = {
                'user_id': user_id,
                'event_type': event_type,
                'event_data': event_data,
                'timestamp': datetime.utcnow().isoformat(),
                'schema_version': '1.0'
            }
            
            # Partition by user_id hash for even distribution
            # while maintaining user session ordering
            partition_key = self._get_partition_key(user_id)
            
            future = self.producer.send(
                topic=self.topic,
                key=partition_key,
                value=event_payload,
                partition=None  # Let Kafka choose based on key
            )
            
            # Non-blocking send with callback
            future.add_callback(self._on_send_success)
            future.add_errback(self._on_send_error)
            
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to produce event: {e}")
            return False
    
    def _get_partition_key(self, user_id: str) -> str:
        """Generate consistent partition key from user_id"""
        return hashlib.md5(user_id.encode()).hexdigest()[:8]
    
    def _on_send_success(self, record_metadata):
        self.logger.debug(
            f"Message sent to {record_metadata.topic} "
            f"partition {record_metadata.partition} "
            f"offset {record_metadata.offset}"
        )
    
    def _on_send_error(self, exception):
        self.logger.error(f"Failed to send message: {exception}")
        # Implement dead letter queue logic here
        
    def flush_and_close(self):
        """Ensure all messages are sent before closing"""
        self.producer.flush(timeout=30)
        self.producer.close(timeout=30)

For high-volume transaction data, we need a more sophisticated consumer with proper offset management and backpressure handling:

# kafka_consumers/transaction_processor.py
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import CommitFailedError
import json
import logging
from typing import Dict, List
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class TransactionProcessor:
    def __init__(self, bootstrap_servers: str, consumer_group: str = "transaction_processor"):
        self.logger = logging.getLogger(__name__)
        self.consumer_group = consumer_group
        
        consumer_config = {
            'bootstrap_servers': bootstrap_servers,
            'group_id': consumer_group,
            'security_protocol': 'SSL',
            'auto_offset_reset': 'earliest',
            'enable_auto_commit': False,  # Manual offset management
            'max_poll_records': 500,  # Batch processing
            'session_timeout_ms': 30000,
            'heartbeat_interval_ms': 10000,
            'value_deserializer': lambda x: json.loads(x.decode('utf-8')),
            'consumer_timeout_ms': 10000
        }
        
        self.consumer = KafkaConsumer('transaction_events', **consumer_config)
        self.executor = ThreadPoolExecutor(max_workers=4)
        self.processing_times = []
        
    def process_batch(self, messages: List) -> bool:
        """Process a batch of transaction messages with parallelization"""
        start_time = time.time()
        
        try:
            # Group messages by partition for ordered processing
            partition_groups = {}
            for message in messages:
                partition = message.partition
                if partition not in partition_groups:
                    partition_groups[partition] = []
                partition_groups[partition].append(message)
            
            # Process partitions in parallel
            futures = []
            for partition, partition_messages in partition_groups.items():
                future = self.executor.submit(
                    self._process_partition_messages, 
                    partition_messages
                )
                futures.append(future)
            
            # Wait for all partitions to complete
            success_count = 0
            for future in as_completed(futures):
                if future.result():
                    success_count += 1
            
            processing_time = time.time() - start_time
            self.processing_times.append(processing_time)
            
            # Log performance metrics
            if len(self.processing_times) >= 100:
                avg_time = sum(self.processing_times) / len(self.processing_times)
                self.logger.info(f"Average batch processing time: {avg_time:.2f}s")
                self.processing_times = []
            
            return success_count == len(partition_groups)
            
        except Exception as e:
            self.logger.error(f"Batch processing failed: {e}")
            return False
    
    def _process_partition_messages(self, messages: List) -> bool:
        """Process messages from a single partition in order"""
        try:
            for message in messages:
                transaction_data = message.value
                
                # Validate message schema
                if not self._validate_transaction_schema(transaction_data):
                    self.logger.error(f"Invalid schema: {message.offset}")
                    continue
                
                # Process individual transaction
                success = self._process_transaction(transaction_data)
                if not success:
                    self.logger.error(f"Processing failed: {message.offset}")
                    return False
                    
            return True
            
        except Exception as e:
            self.logger.error(f"Partition processing failed: {e}")
            return False
    
    def _validate_transaction_schema(self, data: Dict) -> bool:
        """Validate transaction data against expected schema"""
        required_fields = ['transaction_id', 'user_id', 'amount', 'currency', 'timestamp']
        return all(field in data for field in required_fields)
    
    def _process_transaction(self, transaction: Dict) -> bool:
        """Process individual transaction with fraud detection and enrichment"""
        try:
            # Fraud detection logic
            if self._detect_fraud(transaction):
                self.logger.warning(f"Fraudulent transaction: {transaction['transaction_id']}")
                # Send to fraud queue for manual review
                return True  # Don't fail batch for fraud detection
            
            # Enrich transaction data
            enriched_transaction = self._enrich_transaction(transaction)
            
            # Write to data lake
            success = self._write_to_data_lake(enriched_transaction)
            
            return success
            
        except Exception as e:
            self.logger.error(f"Transaction processing error: {e}")
            return False
    
    def run(self):
        """Main consumer loop with proper error handling"""
        self.logger.info("Starting transaction processor")
        
        try:
            while True:
                message_batch = self.consumer.poll(timeout_ms=10000)
                
                if not message_batch:
                    continue
                
                # Flatten messages from all partitions
                all_messages = []
                for partition_messages in message_batch.values():
                    all_messages.extend(partition_messages)
                
                if not all_messages:
                    continue
                
                self.logger.info(f"Processing batch of {len(all_messages)} messages")
                
                success = self.process_batch(all_messages)
                
                if success:
                    try:
                        self.consumer.commit()
                        self.logger.debug("Offset committed successfully")
                    except CommitFailedError as e:
                        self.logger.error(f"Offset commit failed: {e}")
                        # Don't break - consumer will reprocess messages
                else:
                    self.logger.error("Batch processing failed, not committing offsets")
                    
        except KeyboardInterrupt:
            self.logger.info("Shutdown signal received")
        except Exception as e:
            self.logger.error(f"Consumer error: {e}")
        finally:
            self.consumer.close()
            self.executor.shutdown(wait=True)

Batch Data Integration with Airbyte

While Kafka handles streaming data, batch sources require different tooling. Airbyte provides a robust framework for extract-and-load operations with proper error handling and incremental sync capabilities.

Let's configure Airbyte connectors for our key data sources:

# airbyte_configs/salesforce_connection.yaml
apiVersion: airbyte.com/v1alpha1
kind: SourceDefinition
metadata:
  name: salesforce-prod
spec:
  sourceDefinitionId: "b117307c-14b6-41aa-9422-947e34922962"
  dockerRepository: "airbyte/source-salesforce"
  dockerImageTag: "2.0.12"
  documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
  connectionSpecification:
    type: object
    properties:
      client_id:
        type: string
        description: "Salesforce client ID"
        airbyte_secret: true
      client_secret:
        type: string
        description: "Salesforce client secret"  
        airbyte_secret: true
      refresh_token:
        type: string
        description: "Salesforce refresh token"
        airbyte_secret: true
      domain:
        type: string
        description: "Salesforce domain"
        default: "login"
      is_sandbox:
        type: boolean
        description: "Whether to use Salesforce sandbox"
        default: false
      start_date:
        type: string
        description: "Start date for incremental sync"
        format: date-time
        pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
---
apiVersion: airbyte.com/v1alpha1  
kind: Connection
metadata:
  name: salesforce-to-s3
spec:
  sourceId: "salesforce-prod"
  destinationId: "s3-data-lake"
  syncCatalog:
    streams:
      - stream:
          name: "Account"
          jsonSchema:
            type: object
            properties:
              Id: { type: string }
              Name: { type: string }
              Industry: { type: string }
              AnnualRevenue: { type: number }
              CreatedDate: { type: string, format: date-time }
              LastModifiedDate: { type: string, format: date-time }
        config:
          syncMode: "incremental"
          cursorField: ["LastModifiedDate"]
          destinationSyncMode: "append_dedup"
          primaryKey: [["Id"]]
      - stream:
          name: "Opportunity"  
        config:
          syncMode: "incremental"
          cursorField: ["LastModifiedDate"]
          destinationSyncMode: "append_dedup"
          primaryKey: [["Id"]]
  schedule:
    timeUnit: "hours"
    units: 6
  prefix: "salesforce"

For database sources, we need careful handling of incremental extraction:

# custom_connectors/postgres_extractor.py
import psycopg2
import pandas as pd
import boto3
import logging
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import json

class PostgresExtractor:
    def __init__(self, connection_params: Dict[str, Any], 
                 s3_bucket: str, s3_prefix: str):
        self.connection_params = connection_params
        self.s3_bucket = s3_bucket
        self.s3_prefix = s3_prefix
        self.s3_client = boto3.client('s3')
        self.logger = logging.getLogger(__name__)
        
    def extract_incremental(self, table_name: str, 
                          cursor_field: str,
                          last_cursor_value: Optional[str] = None,
                          chunk_size: int = 10000) -> str:
        """
        Extract data incrementally with proper cursor management
        """
        try:
            conn = psycopg2.connect(**self.connection_params)
            
            # Get table metadata
            table_info = self._get_table_info(conn, table_name)
            
            # Determine extraction query
            if last_cursor_value:
                query = f"""
                SELECT * FROM {table_name} 
                WHERE {cursor_field} > %s 
                ORDER BY {cursor_field}
                """
                params = (last_cursor_value,)
            else:
                # Initial full extraction
                query = f"""
                SELECT * FROM {table_name} 
                ORDER BY {cursor_field}
                """
                params = ()
            
            self.logger.info(f"Extracting {table_name} with cursor: {last_cursor_value}")
            
            # Process in chunks to handle large tables
            chunk_number = 0
            total_rows = 0
            max_cursor_value = last_cursor_value
            
            with conn.cursor(name=f'{table_name}_cursor') as cursor:
                cursor.execute(query, params)
                
                while True:
                    rows = cursor.fetchmany(chunk_size)
                    if not rows:
                        break
                    
                    # Convert to DataFrame for easier manipulation
                    columns = [desc[0] for desc in cursor.description]
                    df = pd.DataFrame(rows, columns=columns)
                    
                    # Track maximum cursor value for next extraction
                    if cursor_field in df.columns:
                        chunk_max_cursor = df[cursor_field].max()
                        if max_cursor_value is None or chunk_max_cursor > max_cursor_value:
                            max_cursor_value = str(chunk_max_cursor)
                    
                    # Write chunk to S3
                    self._write_chunk_to_s3(table_name, df, chunk_number)
                    
                    chunk_number += 1
                    total_rows += len(df)
                    
                    self.logger.info(f"Processed chunk {chunk_number}: {len(df)} rows")
            
            # Update cursor state
            self._update_cursor_state(table_name, max_cursor_value)
            
            self.logger.info(f"Extraction complete: {total_rows} rows, cursor: {max_cursor_value}")
            return max_cursor_value
            
        except Exception as e:
            self.logger.error(f"Extraction failed for {table_name}: {e}")
            raise
        finally:
            if 'conn' in locals():
                conn.close()
    
    def _get_table_info(self, conn, table_name: str) -> Dict:
        """Get table metadata for validation and optimization"""
        with conn.cursor() as cursor:
            cursor.execute("""
                SELECT column_name, data_type, is_nullable
                FROM information_schema.columns 
                WHERE table_name = %s
                ORDER BY ordinal_position
            """, (table_name,))
            
            columns = cursor.fetchall()
            return {
                'columns': [{'name': col[0], 'type': col[1], 'nullable': col[2]} 
                           for col in columns],
                'column_count': len(columns)
            }
    
    def _write_chunk_to_s3(self, table_name: str, df: pd.DataFrame, chunk_number: int):
        """Write DataFrame chunk to S3 in Parquet format"""
        timestamp = datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%S")
        s3_key = f"{self.s3_prefix}/{table_name}/year={timestamp[:4]}/month={timestamp[5:7]}/day={timestamp[8:10]}/{table_name}_{timestamp}_{chunk_number:06d}.parquet"
        
        try:
            # Convert DataFrame to Parquet bytes
            parquet_buffer = df.to_parquet(index=False, engine='pyarrow')
            
            # Upload to S3
            self.s3_client.put_object(
                Bucket=self.s3_bucket,
                Key=s3_key,
                Body=parquet_buffer,
                ContentType='application/octet-stream',
                Metadata={
                    'table_name': table_name,
                    'chunk_number': str(chunk_number),
                    'row_count': str(len(df)),
                    'extraction_timestamp': timestamp
                }
            )
            
            self.logger.debug(f"Chunk written to s3://{self.s3_bucket}/{s3_key}")
            
        except Exception as e:
            self.logger.error(f"Failed to write chunk to S3: {e}")
            raise
    
    def _update_cursor_state(self, table_name: str, cursor_value: str):
        """Update cursor state in S3 for next incremental extraction"""
        state_key = f"{self.s3_prefix}/_airbyte_state/{table_name}_state.json"
        
        state_data = {
            'cursor_field': 'updated_at',  # Adjust based on your schema
            'cursor_value': cursor_value,
            'last_updated': datetime.utcnow().isoformat()
        }
        
        try:
            self.s3_client.put_object(
                Bucket=self.s3_bucket,
                Key=state_key,
                Body=json.dumps(state_data),
                ContentType='application/json'
            )
        except Exception as e:
            self.logger.error(f"Failed to update cursor state: {e}")
            raise

Data Transformation with dbt

dbt transforms raw data into analytics-ready models using SQL and software engineering best practices. Our implementation includes comprehensive testing, documentation, and deployment strategies.

First, let's establish our dbt project structure with proper environment configuration:

# dbt_project.yml
name: 'company_analytics'
version: '1.0.0'
config-version: 2

profile: 'company_analytics'

model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets:
  - "target"
  - "dbt_packages"

models:
  company_analytics:
    staging:
      +materialized: view
      +docs:
        node_color: "#F7A343"
    intermediate:
      +materialized: ephemeral
      +docs:
        node_color: "#B565A7"  
    marts:
      +materialized: table
      +docs:
        node_color: "#009639"
      finance:
        +materialized: table
        +post-hook: "grant select on {{ this }} to role analyst"
      marketing:
        +materialized: incremental
        +unique_key: user_id
        +on_schema_change: "append_new_columns"

vars:
  # dbt_utils configurations
  surrogate_key_treat_nulls_as_empty_strings: true
  
  # Date configurations  
  start_date: '2020-01-01'
  
  # Feature flags
  enable_customer_360: true
  enable_real_time_features: false

snapshots:
  company_analytics:
    +target_schema: snapshots
    +strategy: timestamp
    +updated_at: updated_at

Our staging models clean and standardize raw data:

-- models/staging/salesforce/stg_salesforce__accounts.sql
{{ config(
    materialized='view',
    docs={'node_color': '#F7A343'}
) }}

with source as (
    select * from {{ source('salesforce', 'account') }}
),

cleaned as (
    select
        id as account_id,
        name as account_name,
        type as account_type,
        industry,
        
        -- Clean and standardize revenue data
        case 
            when annual_revenue is not null and annual_revenue > 0 
            then annual_revenue 
            else null 
        end as annual_revenue,
        
        -- Standardize country codes
        {{ standardize_country_code('billing_country') }} as billing_country_code,
        billing_state as billing_state,
        billing_city as billing_city,
        billing_postal_code,
        
        -- Parse and validate dates
        {{ safe_cast_timestamp('created_date') }} as created_at,
        {{ safe_cast_timestamp('last_modified_date') }} as updated_at,
        
        -- Data quality flags
        case when name is null or name = '' then true else false end as is_missing_name,
        case when industry is null or industry = '' then true else false end as is_missing_industry,
        
        -- Metadata
        _airbyte_extracted_at,
        _airbyte_raw_id
        
    from source
),

final as (
    select 
        *,
        -- Generate surrogate key for downstream joins
        {{ dbt_utils.surrogate_key(['account_id']) }} as account_sk
    from cleaned
)

select * from final

Intermediate models handle complex business logic:

-- models/intermediate/int_customer_lifetime_value.sql
{{ config(
    materialized='ephemeral'
) }}

with customer_orders as (
    select
        customer_id,
        order_date,
        order_total,
        -- Calculate days between orders for frequency analysis
        lag(order_date) over (partition by customer_id order by order_date) as previous_order_date,
        row_number() over (partition by customer_id order by order_date) as order_sequence
    from {{ ref('stg_orders__orders') }}
    where order_status = 'completed'
),

customer_metrics as (
    select
        customer_id,
        
        -- Monetary metrics
        sum(order_total) as total_revenue,
        avg(order_total) as avg_order_value,
        count(*) as total_orders,
        
        -- Frequency metrics  
        min(order_date) as first_order_date,
        max(order_date) as last_order_date,
        
        -- Calculate average days between orders (excluding first order)
        avg(
            case 
                when previous_order_date is not null 
                then order_date - previous_order_date 
            end
        ) as avg_days_between_orders,
        
        -- Recency in days from current date
        {{ dbt.current_timestamp() }}::date - max(order_date) as days_since_last_order
        
    from customer_orders
    group by customer_id
),

clv_calculation as (
    select
        *,
        
        -- Simple CLV calculation: (AOV * Purchase Frequency * Gross Margin) / Churn Rate
        -- Assuming 20% gross margin and estimating churn rate from recency
        case 
            when avg_days_between_orders > 0 then
                (avg_order_value * (365.0 / avg_days_between_orders) * 0.20) / 
                greatest(0.01, least(1.0, days_since_last_order / 365.0))
            else 
                avg_order_value * 0.20  -- Single purchase customers
        end as estimated_clv,
        
        -- Customer segmentation based on RFM
        case
            when days_since_last_order <= 30 and total_orders >= 5 and total_revenue >= 1000 then 'VIP'
            when days_since_last_order <= 90 and total_orders >= 3 then 'Active'  
            when days_since_last_order <= 180 and total_orders >= 2 then 'At Risk'
            when days_since_last_order > 180 then 'Churned'
            else 'New'
        end as customer_segment
        
    from customer_metrics
)

select * from clv_calculation

Our mart models serve specific business use cases with comprehensive testing:

-- models/marts/finance/fct_monthly_revenue.sql
{{ config(
    materialized='incremental',
    unique_key='revenue_month',
    on_schema_change='append_new_columns',
    post_hook="grant select on {{ this }} to role finance_analyst"
) }}

with revenue_base as (
    select
        date_trunc('month', order_date) as revenue_month,
        
        -- Revenue metrics
        sum(case when order_status = 'completed' then order_total else 0 end) as gross_revenue,
        sum(case when order_status = 'refunded' then order_total else 0 end) as refunded_revenue,
        sum(case when order_status = 'completed' then order_total 
                 when order_status = 'refunded' then -order_total 
                 else 0 end) as net_revenue,
        
        -- Order metrics
        count(case when order_status = 'completed' then 1 end) as completed_orders,
        count(case when order_status = 'refunded' then 1 end) as refunded_orders,
        count(distinct customer_id) as unique_customers,
        
        -- Customer acquisition
        count(distinct case when customer_order_sequence = 1 then customer_id end) as new_customers,
        
        -- Product metrics
        sum(case when order_status = 'completed' then total_quantity else 0 end) as units_sold
        
    from {{ ref('fct_orders') }}
    
    {% if is_incremental() %}
        -- Only process new/updated data
        where date_trunc('month', order_date) >= (
            select max(revenue_month) 
            from {{ this }}
        )
    {% endif %}
    
    group by revenue_month
),

revenue_with_growth as (
    select
        *,
        
        -- Month-over-month growth calculations
        lag(net_revenue) over (order by revenue_month) as previous_month_revenue,
        
        case 
            when lag(net_revenue) over (order by revenue_month) > 0 then
                round(
                    100.0 * (net_revenue - lag(net_revenue) over (order by revenue_month)) / 
                    lag(net_revenue) over (order by revenue_month), 
                    2
                )
            else null
        end as revenue_growth_pct,
        
        -- Calculate average order value
        case 
            when completed_orders > 0 then gross_revenue / completed_orders 
            else 0 
        end as avg_order_value,
        
        -- Customer metrics
        case 
            when unique_customers > 0 then net_revenue / unique_customers 
            else 0 
        end as revenue_per_customer
        
    from revenue_base
)

select * from revenue_with_growth

Comprehensive testing ensures data quality:

-- tests/assert_revenue_consistency.sql
-- Test that monthly revenue aggregation matches daily revenue sum

with monthly_totals as (
    select
        date_trunc('month', revenue_date) as revenue_month,
        sum(net_revenue) as daily_sum_revenue
    from {{ ref('fct_daily_revenue') }}
    group by revenue_month
),

monthly_table as (
    select
        revenue_month,
        net_revenue as monthly_net_revenue
    from {{ ref('fct_monthly_revenue') }}
)

select
    m.revenue_month,
    m.daily_sum_revenue,
    t.monthly_net_revenue,
    abs(m.daily_sum_revenue - t.monthly_net_revenue) as revenue_difference
from monthly_totals m
join monthly_table t using (revenue_month)
where abs(m.daily_sum_revenue - t.monthly_net_revenue) > 0.01

Advanced macros handle complex transformation logic:

-- macros/standardize_country_code.sql
{% macro standardize_country_code(column_name) %}
    case
        when upper({{ column_name }}) in ('US', 'USA', 'UNITED STATES', 'UNITED STATES OF AMERICA') then 'US'
        when upper({{ column_name }}) in ('UK', 'UNITED KINGDOM', 'GB', 'GREAT BRITAIN') then 'GB'  
        when upper({{ column_name }}) in ('CA', 'CANADA') then 'CA'
        when upper({{ column_name }}) in ('AU', 'AUSTRALIA') then 'AU'
        when upper({{ column_name }}) in ('DE', 'GERMANY', 'DEUTSCHLAND') then 'DE'
        when upper({{ column_name }}) in ('FR', 'FRANCE') then 'FR'
        when {{ column_name }} is not null and length({{ column_name }}) = 2 then upper({{ column_name }})
        else null
    end
{% endmacro %}

Real-Time Analytics with Snowflake

Snowflake serves as our analytical engine, handling both batch and streaming workloads. Proper warehouse sizing, clustering, and query optimization are critical for performance and cost management.

Let's configure our Snowflake environment with multiple warehouses for different workloads:

-- snowflake_setup/warehouses.sql

-- ETL warehouse for dbt transformations
CREATE OR REPLACE WAREHOUSE DBT_TRANSFORM_WH WITH
  WAREHOUSE_SIZE = 'X-LARGE'
  AUTO_SUSPEND = 60
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 4
  SCALING_POLICY = 'STANDARD'
  COMMENT = 'Warehouse for dbt transformations and heavy ETL jobs';

-- Analytics warehouse for dashboard queries  
CREATE OR REPLACE WAREHOUSE ANALYTICS_WH WITH
  WAREHOUSE_SIZE = 'LARGE'
  AUTO_SUSPEND = 300  -- 5 minutes for dashboard caching
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 8
  SCALING_POLICY = 'ECONOMY'  -- Cost-optimized scaling
  COMMENT = 'Warehouse for dashboard and analytical queries';

-- Data science warehouse for ML workloads
CREATE OR REPLACE WAREHOUSE DATA_SCIENCE_WH WITH
  WAREHOUSE_SIZE = 'XX-LARGE'
  AUTO_SUSPEND = 180
  AUTO_RESUME = TRUE  
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 2
  SCALING_POLICY = 'STANDARD'
  COMMENT = 'Warehouse for ML training and feature engineering';

Database structure with proper clustering and data retention:

-- snowflake_setup/databases.sql

CREATE OR REPLACE DATABASE RAW_DATA
  DATA_RETENTION_TIME_IN_DAYS = 90
  COMMENT = 'Raw data from ingestion pipelines';

CREATE OR REPLACE DATABASE ANALYTICS
  DATA_RETENTION_TIME_IN_DAYS = 365
  COMMENT = 'Transformed data for analytics and reporting';

-- Create schemas with appropriate access controls
USE DATABASE RAW_DATA;

CREATE OR REPLACE SCHEMA KAFKA_STREAMS
  COMMENT = 'Real-time data from Kafka topics';
  
CREATE OR REPLACE SCHEMA BATCH_EXTRACTS  
  COMMENT = 'Batch data from Airbyte and custom extractors';

USE DATABASE ANALYTICS;

CREATE OR REPLACE SCHEMA STAGING
  COMMENT = 'Cleaned and standardized source data';
  
CREATE OR REPLACE SCHEMA MARTS
  COMMENT = 'Business-ready analytical models';
  
CREATE OR REPLACE SCHEMA SNAPSHOTS
  COMMENT = 'Historical snapshots for slowly changing dimensions';

For high-performance analytical queries, we need properly clustered tables:

-- snowflake_optimization/clustered_tables.sql

-- Customer events table with multi-column clustering
CREATE OR REPLACE TABLE analytics.marts.fct_customer_events (
    event_id STRING,
    user_id STRING,
    event_type STRING, 
    event_timestamp TIMESTAMP_NTZ,
    session_id STRING,
    page_url STRING,
    referrer STRING,
    device_type STRING,
    country_code STRING,
    event_properties VARIANT,
    created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
CLUSTER BY (DATE(event_timestamp), user_id, event_type)
COMMENT = 'Customer behavioral events with optimized clustering for time-series and user analysis';

-- Large transaction table with automatic clustering
CREATE OR REPLACE TABLE analytics.marts.fct_transactions (
    transaction_id STRING PRIMARY KEY,
    user_id STRING,
    transaction_date DATE,
    transaction_timestamp TIMESTAMP_NTZ,
    amount DECIMAL(10,2),
    currency STRING,
    payment_method STRING,
    merchant_category STRING,
    status STRING,
    created_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
CLUSTER BY (transaction_date, user_id)
COMMENT = 'Financial transactions with date and user clustering';

-- Enable automatic clustering for maintenance-free optimization  
ALTER TABLE analytics.marts.fct_transactions RESUME RECLUSTER;
ALTER TABLE analytics.marts.fct_customer_events RESUME RECLUSTER;

Materialized views provide real-time aggregations:

-- snowflake_optimization/materialized_views.sql

-- Real-time revenue dashboard view
CREATE OR REPLACE MATERIALIZED VIEW analytics.marts.mv_real_time_revenue AS
SELECT
    DATE(transaction_timestamp) as transaction_date,
    HOUR(transaction_timestamp) as transaction_hour,
    COUNT(*) as transaction_count,
    SUM(CASE WHEN status = 'completed' THEN amount ELSE 0 END) as completed_revenue,
    SUM(CASE WHEN status = 'failed' THEN amount ELSE 0 END) as failed_revenue,
    COUNT(DISTINCT user_id) as unique_customers,
    AVG(CASE WHEN status = 'completed' THEN amount END) as avg_transaction_value
FROM analytics.marts.fct_transactions
WHERE transaction_timestamp >= CURRENT_DATE() - INTERVAL '7 days'
GROUP BY transaction_date, transaction_hour;

-- Customer activity summary for real-time personalization
CREATE OR REPLACE MATERIALIZED VIEW analytics.marts.mv_customer_activity AS  
SELECT
    user_id,
    DATE(event_timestamp) as activity_date,
    COUNT(*) as total_events,
    COUNT(DISTINCT session_id) as session_count,
    MAX(event_timestamp) as last_activity_timestamp,
    ARRAY_AGG(DISTINCT event_type) as event_types,
    COUNT(CASE WHEN event_type = 'purchase' THEN 1 END) as purchase_events,
    COUNT(CASE WHEN event_type = 'page_view' THEN 1 END) as page_views
FROM analytics.marts.fct_customer_events
WHERE event_timestamp >= CURRENT_DATE() - INTERVAL '30 days'
GROUP BY user_id, activity_date;

Advanced query optimization with search optimization service:

-- snowflake_optimization/search_optimization.sql

-- Enable search optimization for frequently filtered columns
ALTER TABLE analytics.marts.fct_customer_events 
ADD SEARCH OPTIMIZATION ON EQUALITY(user_id, event_type, country_code);

ALTER TABLE analytics.marts.fct_transactions
ADD SEARCH OPTIMIZATION ON EQUALITY(user_id, merchant_category, payment_method);

-- Create secure views for sensitive data access
CREATE OR REPLACE SECURE VIEW analytics.marts.vw_customer_pii AS
SELECT
    user_id,
    CASE 
        WHEN CURRENT_ROLE() IN ('ADMIN', 'DATA_SCIENTIST') THEN email
        ELSE REGEXP_REPLACE(email, '(.{2}).*@', '\\1***@')
    END as email,
    CASE
        WHEN CURRENT_ROLE() IN ('ADMIN', 'DATA_SCIENTIST') THEN full_name  
        ELSE REGEXP_REPLACE(full_name, '(.{2}).*', '\\1***')
    END as full_name,
    created_at,
    last_login_at
FROM analytics.staging.stg_users__users;

Data Governance and Quality Monitoring

Comprehensive data governance ensures reliability, compliance, and trustworthiness across the platform. We'll implement automated quality monitoring, lineage tracking, and access controls.

First, let's establish data quality monitoring with Great Expectations:

# data_quality/expectations_suite.py
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.data_context import DataContext
import logging

class DataQualityMonitor:
    def __init__(self, data_context_path: str):
        self.context = DataContext(data_context_path)
        self.logger = logging.getLogger(__name__)
        
    def create_customer_events_suite(self) -> str:
        """Create comprehensive expectations for customer events data"""
        
        suite_name = "customer_events_quality_suite"
        
        # Create or update expectation suite
        try:
            suite = self.context.get_expectation_suite(suite_name)
        except:
            suite = self.context.create_expectation_suite(suite_name)
        
        # Core data expectations
        expectations = [
            # Completeness checks
            ExpectationConfiguration(
                expectation_type="expect_column_to_exist",
                kwargs={"column": "event_id"}
            ),
            ExpectationConfiguration(
                expectation_type="expect_column_to_exist", 
                kwargs={"column": "user_id"}
            ),
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_not_be_null",
                kwargs={"column": "event_id"}
            ),
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_not_be_null",
                kwargs={"column": "user_id"}
            ),
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_not_be_null",
                kwargs={"column": "event_timestamp"}
            ),
            
            # Format and type checks
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_match_regex",
                kwargs={
                    "column": "event_id",
                    "regex": r"^[a-f0-9-]{36}$"  # UUID format
                }
            ),
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_be_in_set",
                kwargs={
                    "column": "event_type",
                    "value_set": [
                        "page_view", "button_click", "form_submit",
                        "purchase", "cart_add", "cart_remove", "login", "logout"
                    ]
                }
            ),
            
            # Business logic checks  
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_be_between",
                kwargs={
                    "column": "event_timestamp",
                    "min_value": "2020-01-01T00:00:00Z",
                    "max_value": None,  # No future limit
                    "parse_strings_as_datetimes": True
                }
            ),
            
            # Statistical checks
            ExpectationConfiguration(
                expectation_type="expect_column_unique_value_count_to_be_between",
                kwargs={
                    "column": "user_id",
                    "min_value": 1000,  # Expect at least 1000 unique users daily
                    "max_value": None
                }
            ),
            
            # Custom business rule: Session integrity
            ExpectationConfiguration(
                expectation_type="expect_multicolumn_sum_to_equal",
                kwargs={
                    "column_list": ["session_start_events", "session_end_events"],
                    "sum_total": 0,
                    "ignore_row_if": "any_value_is_missing"
                }
            )
        ]
        
        # Add all expectations to suite
        for expectation in expectations:
            suite.add_expectation(expectation, send_notification_on_failure=True)
        
        # Save suite
        self.context.save_expectation_suite(suite)
        
        return suite_name
    
    def create_revenue_data_suite(self) -> str:
        """Create financial data quality expectations with strict validation"""
        
        suite_name = "revenue_data_quality_suite"
        
        try:
            suite = self.context.get_expectation_suite(suite_name)
        except:
            suite = self.context.create_expectation_suite(suite_name)
        
        # Financial data requires stricter validation
        financial_expectations = [
            # Revenue amount validation
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_be_between",
                kwargs={
                    "column": "transaction_amount",
                    "min_value": 0.01,  # No zero or negative transactions
                    "max_value": 1000000.00,  # Reasonable upper limit
                    "mostly": 0.999  # Allow for 0.1% outliers
                }
            ),
            
            # Currency code validation
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_be_in_set",
                kwargs={
                    "column": "currency_code",
                    "value_set": ["USD", "EUR", "GBP", "CAD", "AUD"]
                }
            ),
            
            # Transaction ID uniqueness
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_be_unique",
                kwargs={"column": "transaction_id"}
            ),
            
            # Status validation
            ExpectationConfiguration(
                expectation_type="expect_column_values_to_be_in_set",
                kwargs={
                    "column": "transaction_status",
                    "value_set": ["pending", "completed", "failed", "refunded"]
                }
            ),
            
            # Date consistency check
            ExpectationConfiguration(
                expectation_type="expect_column_pair_values_A_to_be_greater_than_B",
                kwargs={
                    "column_A": "updated_at",
                    "column_B": "created_at",
                    "or_equal": True,
                    "ignore_row_if": "either_value_is_missing"
                }
            )
        ]
        
        for expectation in financial_expectations:
            suite.add_expectation(expectation, send_notification_on_failure=True)
        
        self.context.save_expectation_suite(suite)
        return suite_name
    
    def run_validation_checkpoint(self, suite_name: str, 
                                 datasource_name: str, 
                                 data_asset_name: str) -> bool:
        """Execute validation checkpoint and return success status"""
        
        checkpoint_config = {
            "name": f"{suite_name}_checkpoint",
            "config_version": 1.0,
            "template_name": None,
            "module_name": "great_expectations.checkpoint",
            "class_name": "Checkpoint",
            "run_name_template": "%Y%m%d-%H%M%S-" + suite_name,
            "expectation_suite_name": suite_name,
            "batch_request": {
                "datasource_name": datasource_name,
                "data_connector_name": "default_inferred_data_connector_name",
                "data_asset_name": data_asset_name
            },
            "action_list": [
                {
                    "name": "store_validation_result",
                    "action": {
                        "class_name": "StoreValidationResultAction"
                    }
                },
                {
                    "name": "store_evaluation_params",
                    "action": {
                        "class_name": "StoreEvaluationParametersAction"
                    }
                },
                {
                    "name": "update_data_docs",
                    "action": {
                        "class_name": "UpdateDataDocsAction",
                        "site_names": []
                    }
                }
            ]
        }
        
        try:
            checkpoint = Checkpoint(**checkpoint_config)
            results = checkpoint.run()
            
            success = results["success"]
            
            if not success:
                self.logger.error(f"Data validation failed for {suite_name}")
                # Send alert to monitoring system
                self._send_validation_alert(suite_name, results)
            else:
                self.logger.info(f"Data validation passed for {suite_name}")
                
            return success
            
        except Exception as e:
            self.logger.error(f"Checkpoint execution failed: {e}")
            return False
    
    def _send_validation_alert(self, suite_name: str, results: dict):
        """Send data quality alert to monitoring systems"""
        # Implementation would integrate with Slack, PagerDuty, etc.
        pass

Data lineage tracking with custom metadata collection:

# data_governance/lineage_tracker.py
import json
import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
import networkx as nx

@dataclass
class DataLineageNode:
    node_id: str
    node_type: str  # 'source', 'transformation', 'destination'  
    name: str
    schema_name: Optional[str] = None
    table_name: Optional[str] = None
    transformation_logic: Optional[str] = None
    created_at: datetime = None
    
@dataclass  
class DataLineageEdge:
    source_node_id: str
    target_node_id: str
    edge_type: str  # 'reads_from', 'writes_to', 'transforms'
    created_at: datetime = None

class DataLineageTracker:
    def __init__(self, metadata_store_url: str):
        self.metadata_store_url = metadata_store_url
        self.graph = nx.DiGraph()
        
    def track_dbt_lineage(self, manifest_path: str, run_results_path: str):
        """Extract lineage information from dbt artifacts"""
        
        with open(manifest_path, 'r') as f:
            manifest = json.load(f)
            
        with open(run_results_path, 'r') as f:
            run_results = json.load(f)
        
        # Process dbt models
        for model_id, model_data in manifest.get('nodes', {}).items():
            if model_data['resource_type'] != 'model':
                continue
                
            # Create node for the model
            model_node = DataLineageNode(
                node_id=model_id,
                node_type='transformation',
                name=model_data['name'],
                schema_name=model_data['schema'],
                table_name=model_data['name'],
                transformation_logic=model_data.get('raw_code', ''),
                created_at=datetime.utcnow()
            )
            
            self._add_lineage_node(model_node)
            
            # Track dependencies
            for dep in model_data.get('depends_on', {}).get('nodes', []):
                edge = DataLineageEdge(
                    source_node_id=dep,
                    target_node_id=model_id,
                    edge_type='transforms',
                    created_at=datetime.utcnow()
                )
                self._add_lineage_edge(edge)
        
        # Process sources
        for source_id, source_data in manifest.get('sources', {}).items():
            source_node = DataLineageNode(
                node_id=source_id,
                node_type='source',
                name=source_data['name'],
                schema_name=source_data['schema'],
                table_name=source_data['name'],
                created_at=datetime.utcnow()
            )
            
            self._add_lineage_node(source_node)
    
    def track_airflow_lineage(self, dag_id: str, task_metadata: Dict):
        """Track lineage from Airflow DAG execution"""
        
        for task_id, task_info in task_metadata.items():
            # Create transformation node for each task
            task_node = DataLineageNode(
                node_id=f"{dag_id}.{task_id}",
                node_type='transformation',
                name=task_id,
                transformation_logic=task_info.get('operator_class', ''),
                created_at=datetime.utcnow()
            )
            
            self._add_lineage_node(task_node)
            
            # Track input and output datasets
            for input_dataset in task_info.get('inputs', []):
                input_node = DataLineageNode(
                    node_id=input_dataset,
                    node_type='source' if 'source' in input_dataset else 'transformation',
                    name=input_dataset.split('.')[-1],
                    created_at=datetime.utcnow()
                )
                
                self._add_lineage_node(input_node)
                
                edge = DataLineageEdge(
                    source_node_id=input_dataset,
                    target_node_id=f"{dag_id}.{task_id}",
                    edge_type='reads_from',
                    created_at=datetime.utcnow()
                )
                
                self._add_lineage_edge(edge)
    
    def get_upstream_dependencies(self, node_id: str, max_depth: int = 5) -> List[str]:
        """Get all upstream dependencies for a given node"""
        
        if node_id not in self.graph:
            return []
        
        upstream_nodes = []
        visited = set()
        
        def traverse_upstream(current_node, depth):
            if depth >= max_depth or current_node in visited:
                return
                
            visited.add(current_node)
            
            for predecessor in self.graph.predecessors(current_node):
                upstream_nodes.append(predecessor)
                traverse_upstream(predecessor, depth + 1)
        
        traverse_upstream(node_id, 0)
        return list(set(upstream_nodes))
    
    def get_downstream_impact(self, node_id: str, max_depth: int = 5) -> List[str]:
        """Get all downstream nodes that would be impacted by changes"""
        
        if node_id not in self.graph:
            return []
        
        downstream_nodes = []
        visited = set()
        
        def traverse_downstream(current_node, depth):
            if depth >= max_depth or current_node in visited:
                return
                
            visited.add(current_node)
            
            for successor in self.graph.successors(current_node):
                downstream_nodes.append(successor)
                traverse_downstream(successor, depth + 1)
        
        traverse_downstream(node_id, 0)
        return list(set(downstream_nodes))
    
    def _add_lineage_node(self, node: DataLineageNode):
        """Add or update lineage node in graph and metadata store"""
        
        self.graph.add_node(node.node_id, **{
            'name': node.name,
            'node_type': node.node_type,
            'schema_name': node.schema_name,
            'table_name': node.table_name,
            'created_at': node.created_at.isoformat() if node.created_at else None
        })
        
        # Store in metadata system
        self._persist_to_metadata_store('nodes', node.__dict__)
    
    def _add_lineage_edge(self, edge: DataLineageEdge):
        """Add lineage edge to graph and metadata store"""
        
        self.graph.add_edge(
            edge.source_node_id, 
            edge.target_node_id,
            edge_type=edge.edge_type,
            created_at=edge.created_at.isoformat() if edge.created_at else None
        )
        
        # Store in metadata system
        self._persist_to_metadata_store('edges', edge.__dict__)
    
    def _persist_to_metadata_store(self, entity_type: str, data: Dict):
        """Persist lineage data to external metadata store"""
        
        try:
            response = requests.post(
                f"{self.metadata_store_url}/api/v1/{entity_type}",
                json=data,
                headers={'Content-Type': 'application/json'},
                timeout=30
            )
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"Failed to persist {entity_type} to metadata store: {e}")

Workflow Orchestration with Airflow

Apache Airflow orchestrates our data pipelines with proper error handling, monitoring, and dependency management. Our implementation includes custom operators and comprehensive observability.

First, let's establish our Airflow configuration with production-ready settings:

# airflow/dags/config/dag_config.py
from datetime import datetime, timedelta
from airflow.models import Variable
import os

class DAGConfig:
    """Centralized configuration for all data platform DAGs"""
    
    # Default DAG arguments
    DEFAULT_ARGS = {
        'owner': 'data-engineering',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'email': ['data-alerts@company.com'],
        'sla': timedelta(hours=2)
    }
    
    # Environment-specific configurations
    ENVIRONMENT = Variable.get("environment", default_var="dev")
    
    SNOWFLAKE_CONN_ID = f"snowflake_{ENVIRONMENT}"
    KAFKA_CONN_ID = f"kafka_{ENVIRONMENT}" 
    S3_CONN_ID = f"s3_{ENVIRONMENT}"
    
    # Data quality settings
    DATA_QUALITY_FAILURE_THRESHOLD = 0.95  # 95% tests must pass
    
    # Resource limits
    KUBERNETES_RESOURCE_LIMITS = {
        'memory': '4Gi',
        'cpu': '2000m'
    }
    
    KUBERNETES_RESOURCE_REQUESTS = {
        'memory': '2Gi', 
        'cpu': '1000m'
    }
    
    # Monitoring and alerting
    SLACK_CONN_ID = 'slack_data_alerts'
    PAGERDUTY_CONN_ID = 'pagerduty_critical'
    
    @staticmethod
    def get_warehouse_config(workload_type: str) -> dict:
        """Get Snowflake warehouse configuration based on workload type"""
        
        warehouse_configs = {
            'etl': {
                'warehouse': 'DBT_TRANSFORM_WH',
                'database': 'ANALYTICS',

Learning Path: Modern Data Stack

Previous

Cost Management in Cloud Data Platforms

Related Articles

Data Engineering⚡ Practitioner

Cost Management in Cloud Data Platforms

28 min
Data Engineering🌱 Foundation

Real-Time Data: When to Use Streaming vs Batch Processing

21 min
Data Engineering🔥 Expert

Data Governance: Catalogs, Lineage, and Access Controls

28 min

On this page

  • Prerequisites
  • Architecture Design and Technology Selection
  • Infrastructure Foundation with Terraform
  • Real-Time Data Ingestion with Kafka
  • Batch Data Integration with Airbyte
  • Data Transformation with dbt
  • Real-Time Analytics with Snowflake
  • Data Governance and Quality Monitoring
  • Workflow Orchestration with Airflow