Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Data Governance: Catalogs, Lineage, and Access Controls

Data Governance: Catalogs, Lineage, and Access Controls

Data Engineering🔥 Expert28 min readApr 19, 2026Updated Apr 19, 2026
Table of Contents
  • Prerequisites
  • The Governance Triangle: Catalogs, Lineage, and Access Controls
  • Architecting Enterprise Data Catalogs
  • Beyond Simple Metadata Storage
  • Advanced Catalog Search and Discovery
  • Advanced Data Lineage Architecture
  • Beyond Simple Column-Level Tracking
  • Real-Time Lineage Updates
  • Fine-Grained Access Control Architecture
  • Beyond Role-Based Access Control
  • Integration with Query Engines

Data Governance: Building Production-Grade Catalogs, Lineage, and Access Controls

Picture this: your data engineering team just discovered that a critical ML model has been using corrupted customer data for the past six months. The corruption happened during a schema migration, but nobody knew which downstream systems were affected because your data lineage tracking was limited to a few Confluence pages. Meanwhile, your compliance team is asking for an audit trail of who accessed sensitive PII data, but your access controls are scattered across a dozen different systems with no centralized logging.

This scenario plays out daily in organizations that treat data governance as an afterthought. As data systems grow in complexity—with hundreds of datasets, dozens of transformation pipelines, and multiple teams accessing data—the lack of systematic governance becomes a critical bottleneck. You need more than spreadsheets and good intentions; you need production-grade systems that automatically track, control, and document your data ecosystem.

By the end of this lesson, you'll know how to architect and implement a comprehensive data governance framework that scales with your organization. You'll understand not just the tools, but the underlying principles and trade-offs that determine whether your governance system becomes a helpful guide or an ignored bureaucracy.

What you'll learn:

  • How to design and implement enterprise-grade data catalogs that developers actually use
  • Advanced data lineage tracking techniques, including automated discovery and impact analysis
  • Fine-grained access control patterns that balance security with developer productivity
  • Integration strategies for governance across heterogeneous data stacks
  • Performance optimization techniques for governance systems at scale
  • Common failure modes and how to design resilient governance architectures

Prerequisites

This lesson assumes you have experience with:

  • SQL and data modeling concepts
  • Basic understanding of modern data stack components (data warehouses, ETL/ELT tools, orchestrators)
  • Familiarity with at least one cloud platform (AWS, GCP, or Azure)
  • Experience with API design and distributed systems concepts
  • Understanding of basic security principles (authentication, authorization, encryption)

The Governance Triangle: Catalogs, Lineage, and Access Controls

Modern data governance rests on three interconnected pillars that must work together to be effective. Understanding their relationships is crucial before diving into implementation details.

Data Catalogs serve as the central nervous system, maintaining metadata about every asset in your data ecosystem. They answer "what data exists?" and "how do I use it?" But catalogs are only as good as their metadata quality and discoverability.

Data Lineage provides the circulatory system, tracking how data flows through transformations and dependencies. It answers "where does this data come from?" and "what will break if I change this?" Lineage becomes critical for impact analysis, debugging, and compliance.

Access Controls act as the immune system, determining who can access what data under which circumstances. They answer "who can see this?" and "how do we audit access?" Access controls must be granular enough for security while remaining manageable for operations.

The key insight is that these systems must share metadata and coordinate policies. A catalog that doesn't understand lineage relationships can't provide meaningful impact warnings. Access controls that don't integrate with catalogs become impossible to manage at scale. Lineage tracking that ignores access patterns misses crucial governance signals.

Architecting Enterprise Data Catalogs

Beyond Simple Metadata Storage

Most organizations start with basic catalog tools that store table schemas and descriptions. This approach fails at scale because it treats metadata as static documentation rather than living system knowledge. Production catalogs need to handle dynamic metadata, automated discovery, and complex relationships between assets.

Let's examine a realistic catalog architecture for a mid-size e-commerce company with multiple data sources:

# catalog-architecture.yaml
catalog_components:
  metadata_store:
    primary: PostgreSQL cluster
    search_index: Elasticsearch
    cache: Redis
    blob_storage: S3 (for large schemas/samples)
  
  discovery_agents:
    - name: warehouse_scanner
      targets: [Snowflake, BigQuery]
      schedule: "*/30 * * * *"  # Every 30 minutes
    - name: kafka_scanner
      targets: [Kafka topics]
      schedule: "*/5 * * * *"   # Every 5 minutes
    - name: api_scanner
      targets: [REST APIs, GraphQL]
      schedule: "0 */6 * * *"   # Every 6 hours
  
  quality_monitors:
    - schema_drift_detection
    - data_freshness_tracking
    - usage_analytics
    - quality_score_calculation

The discovery agents continuously scan your data infrastructure, automatically updating metadata when schemas change, new tables appear, or data quality issues emerge. This eliminates the manual maintenance burden that causes most catalogs to become stale.

Here's how to implement a production-grade discovery agent for data warehouses:

# warehouse_discovery_agent.py
import asyncio
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from datetime import datetime, timedelta
import sqlalchemy as sa
from sqlalchemy.sql import text

@dataclass
class TableMetadata:
    database: str
    schema: str
    table: str
    columns: List[Dict]
    row_count: Optional[int]
    size_bytes: Optional[int]
    last_modified: Optional[datetime]
    quality_score: Optional[float]
    tags: Set[str]

class WarehouseDiscoveryAgent:
    def __init__(self, connection_string: str, catalog_api_url: str):
        self.engine = sa.create_engine(connection_string)
        self.catalog_api = CatalogAPI(catalog_api_url)
        self.last_scan = {}  # Track per-table scan timestamps
        
    async def discover_and_sync(self):
        """Main discovery loop with incremental updates"""
        try:
            # Get list of tables with modification times
            current_tables = await self._get_table_list()
            
            # Identify what needs updating
            tables_to_scan = self._identify_changes(current_tables)
            
            # Process in batches to avoid overwhelming the warehouse
            batch_size = 20
            for i in range(0, len(tables_to_scan), batch_size):
                batch = tables_to_scan[i:i + batch_size]
                await self._process_batch(batch)
                
            # Clean up deleted tables
            await self._cleanup_deleted_tables(current_tables)
            
        except Exception as e:
            logger.error(f"Discovery failed: {e}")
            # Critical: Don't let discovery failures break the catalog
            await self._record_discovery_failure(e)
    
    async def _get_table_list(self) -> Dict[str, datetime]:
        """Get all tables with their last modification times"""
        query = text("""
        SELECT 
            table_catalog,
            table_schema,
            table_name,
            GREATEST(
                COALESCE(last_altered, '1900-01-01'::timestamp),
                COALESCE(last_ddl, '1900-01-01'::timestamp)
            ) as last_modified
        FROM information_schema.tables 
        WHERE table_type = 'BASE TABLE'
        AND table_schema NOT IN ('INFORMATION_SCHEMA', 'ACCOUNT_USAGE')
        """)
        
        result = {}
        async with self.engine.connect() as conn:
            rows = await conn.execute(query)
            for row in rows:
                table_key = f"{row.table_catalog}.{row.table_schema}.{row.table_name}"
                result[table_key] = row.last_modified
                
        return result
    
    def _identify_changes(self, current_tables: Dict[str, datetime]) -> List[str]:
        """Identify which tables need metadata refresh"""
        tables_to_scan = []
        
        for table_key, last_modified in current_tables.items():
            # Scan if table is new or modified since last scan
            if (table_key not in self.last_scan or 
                last_modified > self.last_scan[table_key]):
                tables_to_scan.append(table_key)
                
        return tables_to_scan
    
    async def _extract_table_metadata(self, table_key: str) -> TableMetadata:
        """Extract comprehensive metadata for a single table"""
        db, schema, table = table_key.split('.')
        
        # Get column information with extended metadata
        columns_query = text("""
        SELECT 
            column_name,
            data_type,
            is_nullable,
            column_default,
            comment,
            -- Add statistical information for governance
            (SELECT COUNT(DISTINCT column_name) 
             FROM {db}.{schema}.{table} 
             WHERE column_name IS NOT NULL) as distinct_count,
            (SELECT COUNT(*) 
             FROM {db}.{schema}.{table} 
             WHERE column_name IS NULL) as null_count
        FROM information_schema.columns 
        WHERE table_catalog = :db 
        AND table_schema = :schema 
        AND table_name = :table
        ORDER BY ordinal_position
        """.format(db=db, schema=schema, table=table))
        
        # Get table-level statistics
        stats_query = text("""
        SELECT 
            COUNT(*) as row_count,
            -- Estimate size in bytes (warehouse-specific)
            APPROXIMATE_SIZE() as size_bytes
        FROM {db}.{schema}.{table}
        """.format(db=db, schema=schema, table=table))
        
        async with self.engine.connect() as conn:
            # Execute queries concurrently
            columns_result, stats_result = await asyncio.gather(
                conn.execute(columns_query, {'db': db, 'schema': schema, 'table': table}),
                conn.execute(stats_query)
            )
            
            columns = [dict(row) for row in columns_result]
            stats = dict(stats_result.fetchone())
            
            # Calculate quality score based on metadata completeness
            quality_score = self._calculate_quality_score(columns, stats)
            
            # Infer semantic tags from column names and types
            tags = self._infer_semantic_tags(columns, table)
            
            return TableMetadata(
                database=db,
                schema=schema,
                table=table,
                columns=columns,
                row_count=stats['row_count'],
                size_bytes=stats['size_bytes'],
                last_modified=datetime.now(),
                quality_score=quality_score,
                tags=tags
            )
    
    def _calculate_quality_score(self, columns: List[Dict], stats: Dict) -> float:
        """Calculate data quality score based on multiple factors"""
        score_components = []
        
        # Schema completeness (do columns have descriptions?)
        documented_columns = sum(1 for col in columns if col.get('comment'))
        schema_score = documented_columns / len(columns) if columns else 0
        score_components.append(('schema_documentation', schema_score, 0.3))
        
        # Data freshness (based on last modification)
        # Implementation depends on your business requirements
        freshness_score = 1.0  # Simplified for example
        score_components.append(('freshness', freshness_score, 0.2))
        
        # Data completeness (null ratios)
        if columns:
            avg_null_ratio = sum(col.get('null_count', 0) for col in columns) / (
                len(columns) * max(1, stats.get('row_count', 1))
            )
            completeness_score = max(0, 1 - avg_null_ratio)
            score_components.append(('completeness', completeness_score, 0.3))
        
        # Usage score (would integrate with query logs)
        usage_score = 0.8  # Placeholder - implement based on query frequency
        score_components.append(('usage', usage_score, 0.2))
        
        # Weighted average
        total_score = sum(score * weight for _, score, weight in score_components)
        return round(total_score, 2)
    
    def _infer_semantic_tags(self, columns: List[Dict], table_name: str) -> Set[str]:
        """Automatically infer semantic tags for governance"""
        tags = set()
        
        # Table-level tags from naming conventions
        if 'user' in table_name.lower() or 'customer' in table_name.lower():
            tags.add('pii_risk')
        if 'transaction' in table_name.lower() or 'payment' in table_name.lower():
            tags.add('financial')
        if 'log' in table_name.lower() or 'event' in table_name.lower():
            tags.add('behavioral')
            
        # Column-level tags
        pii_patterns = ['email', 'phone', 'ssn', 'address', 'name']
        sensitive_patterns = ['salary', 'income', 'credit', 'password']
        
        column_names = [col['column_name'].lower() for col in columns]
        
        if any(pattern in name for name in column_names for pattern in pii_patterns):
            tags.add('contains_pii')
        if any(pattern in name for name in column_names for pattern in sensitive_patterns):
            tags.add('sensitive')
            
        return tags

This discovery agent demonstrates several critical patterns for production catalogs:

Incremental Discovery: Instead of scanning everything every time, it tracks modification timestamps and only processes changed tables. This reduces warehouse load and speeds up discovery.

Quality Scoring: Automatically calculated quality scores help users understand data trustworthiness. The scoring combines multiple factors: documentation completeness, data freshness, null ratios, and usage patterns.

Semantic Tagging: Automatically inferred tags enable governance rules and search functionality. The example shows basic pattern matching, but production systems might use machine learning for more sophisticated classification.

Resilient Error Handling: Discovery failures are recorded but don't break the catalog. This is crucial because discovery agents run continuously in production.

Advanced Catalog Search and Discovery

The value of a catalog depends heavily on how easily users can find relevant data. Basic keyword search isn't sufficient when you have thousands of datasets. You need semantic search that understands context, relationships, and user intent.

Here's an advanced search implementation that combines multiple ranking signals:

# catalog_search_engine.py
from typing import List, Dict, Any, Optional
import asyncio
from dataclasses import dataclass
from elasticsearch import AsyncElasticsearch
import numpy as np
from sentence_transformers import SentenceTransformer

@dataclass
class SearchResult:
    asset_id: str
    asset_type: str
    title: str
    description: str
    score: float
    match_reasons: List[str]
    related_assets: List[str]

class CatalogSearchEngine:
    def __init__(self, es_client: AsyncElasticsearch):
        self.es_client = es_client
        # Load semantic embedding model for content understanding
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        
    async def search(self, 
                    query: str, 
                    user_context: Dict[str, Any],
                    filters: Optional[Dict] = None,
                    limit: int = 20) -> List[SearchResult]:
        """Multi-signal search combining text, semantic, and behavioral signals"""
        
        # Generate semantic embedding for the query
        query_embedding = self.embedding_model.encode(query).tolist()
        
        # Build Elasticsearch query combining multiple strategies
        es_query = {
            "query": {
                "function_score": {
                    "query": {
                        "bool": {
                            "should": [
                                # Exact matches in titles get highest boost
                                {
                                    "match_phrase": {
                                        "title": {
                                            "query": query,
                                            "boost": 3.0
                                        }
                                    }
                                },
                                # Fuzzy matching in titles
                                {
                                    "match": {
                                        "title": {
                                            "query": query,
                                            "fuzziness": "AUTO",
                                            "boost": 2.0
                                        }
                                    }
                                },
                                # Description matching
                                {
                                    "match": {
                                        "description": {
                                            "query": query,
                                            "boost": 1.5
                                        }
                                    }
                                },
                                # Column name matching (important for technical searches)
                                {
                                    "nested": {
                                        "path": "columns",
                                        "query": {
                                            "match": {
                                                "columns.name": {
                                                    "query": query,
                                                    "boost": 1.8
                                                }
                                            }
                                        }
                                    }
                                },
                                # Semantic similarity using dense vectors
                                {
                                    "script_score": {
                                        "query": {"match_all": {}},
                                        "script": {
                                            "source": "cosineSimilarity(params.query_vector, 'description_embedding') + 1.0",
                                            "params": {"query_vector": query_embedding}
                                        }
                                    }
                                },
                                # Tag matching (exact match gets high boost)
                                {
                                    "terms": {
                                        "tags": query.split(),
                                        "boost": 2.5
                                    }
                                }
                            ],
                            "minimum_should_match": 1
                        }
                    },
                    "functions": [
                        # Boost recently accessed assets
                        {
                            "filter": {"range": {"last_accessed": {"gte": "now-7d"}}},
                            "weight": 1.2
                        },
                        # Boost high-quality assets
                        {
                            "field_value_factor": {
                                "field": "quality_score",
                                "factor": 1.5,
                                "missing": 0.5
                            }
                        },
                        # Boost assets from user's team/department
                        {
                            "filter": {
                                "terms": {
                                    "owner_team": user_context.get("teams", [])
                                }
                            },
                            "weight": 1.3
                        },
                        # Personalization: boost assets user has accessed before
                        {
                            "filter": {
                                "terms": {
                                    "asset_id": user_context.get("recent_assets", [])
                                }
                            },
                            "weight": 1.1
                        }
                    ],
                    "score_mode": "multiply",
                    "boost_mode": "multiply"
                }
            },
            "size": limit,
            "highlight": {
                "fields": {
                    "title": {},
                    "description": {},
                    "columns.name": {}
                }
            },
            "_source": {
                "excludes": ["description_embedding"]  # Don't return large vectors
            }
        }
        
        # Apply filters if provided
        if filters:
            es_query["query"]["function_score"]["query"]["bool"]["filter"] = [
                self._build_filter(k, v) for k, v in filters.items()
            ]
        
        # Execute search
        response = await self.es_client.search(
            index="data_catalog",
            body=es_query
        )
        
        # Process results and explain ranking
        results = []
        for hit in response["hits"]["hits"]:
            source = hit["_source"]
            
            # Extract match reasons from highlights and query analysis
            match_reasons = self._extract_match_reasons(hit, query)
            
            # Get related assets through lineage
            related_assets = await self._get_related_assets(source["asset_id"])
            
            results.append(SearchResult(
                asset_id=source["asset_id"],
                asset_type=source["asset_type"],
                title=source["title"],
                description=source["description"],
                score=hit["_score"],
                match_reasons=match_reasons,
                related_assets=related_assets
            ))
        
        return results
    
    def _extract_match_reasons(self, hit: Dict, query: str) -> List[str]:
        """Explain why this result matched - crucial for user trust"""
        reasons = []
        
        if "highlight" in hit:
            highlights = hit["highlight"]
            if "title" in highlights:
                reasons.append("Title contains your search terms")
            if "description" in highlights:
                reasons.append("Description mentions relevant concepts")
            if "columns.name" in highlights:
                reasons.append("Contains matching column names")
        
        # Analyze score components (simplified)
        score = hit["_score"]
        source = hit["_source"]
        
        if source.get("quality_score", 0) > 0.8:
            reasons.append("High quality data")
        if source.get("last_accessed") and self._is_recent(source["last_accessed"]):
            reasons.append("Recently accessed by others")
        
        return reasons
    
    async def _get_related_assets(self, asset_id: str, limit: int = 5) -> List[str]:
        """Find related assets through lineage relationships"""
        lineage_query = {
            "query": {
                "bool": {
                    "should": [
                        {"term": {"upstream_assets": asset_id}},
                        {"term": {"downstream_assets": asset_id}}
                    ]
                }
            },
            "size": limit,
            "_source": ["asset_id", "title"]
        }
        
        response = await self.es_client.search(
            index="data_catalog",
            body=lineage_query
        )
        
        return [hit["_source"]["asset_id"] for hit in response["hits"]["hits"]]

This search engine demonstrates several advanced patterns:

Multi-Signal Ranking: Combines exact text matching, fuzzy matching, semantic similarity, and behavioral signals (usage, quality, personalization) to provide relevant results.

Explainable Results: The match_reasons field helps users understand why results were returned, building trust in the search system.

Contextual Personalization: Results are boosted based on the user's team, department, and historical usage patterns.

Semantic Understanding: Uses sentence transformers to find conceptually similar content even when exact keywords don't match.

Advanced Data Lineage Architecture

Beyond Simple Column-Level Tracking

Most lineage tools focus on table-to-table relationships, but production systems need much more granular tracking. You need to understand not just that Table A feeds Table B, but which specific columns influence which calculations, how transformations change data semantics, and what the impact radius is for any given change.

Here's a production lineage system architecture that handles complex transformation logic:

# advanced_lineage_tracker.py
from typing import Dict, List, Set, Optional, Union, Any
from dataclasses import dataclass, field
from enum import Enum
import sqlparse
from sqlparse.sql import Statement, Identifier, Function
from sqlparse.tokens import Token
import networkx as nx
from datetime import datetime
import json

class LineageEventType(Enum):
    TABLE_CREATE = "table_create"
    TABLE_UPDATE = "table_update" 
    TABLE_DELETE = "table_delete"
    COLUMN_ADD = "column_add"
    COLUMN_DROP = "column_drop"
    TRANSFORMATION = "transformation"
    DATA_COPY = "data_copy"

@dataclass
class ColumnLineage:
    """Tracks lineage at the column level with transformation context"""
    source_table: str
    source_column: str
    target_table: str
    target_column: str
    transformation_type: str  # "direct_copy", "aggregation", "calculation", "join"
    transformation_logic: Optional[str]  # SQL expression or description
    confidence_score: float  # How certain we are about this lineage
    created_at: datetime = field(default_factory=datetime.now)

@dataclass  
class LineageEvent:
    """Represents a single lineage-affecting event"""
    event_id: str
    event_type: LineageEventType
    source_system: str
    execution_id: str  # Link to pipeline run that caused this event
    affected_assets: List[str]
    column_lineage: List[ColumnLineage]
    metadata: Dict[str, Any]
    timestamp: datetime = field(default_factory=datetime.now)

class SQLLineageParser:
    """Parses SQL to extract column-level lineage relationships"""
    
    def __init__(self):
        self.lineage_graph = nx.DiGraph()
        
    def parse_sql_lineage(self, sql: str, target_table: str) -> List[ColumnLineage]:
        """Extract column lineage from SQL statements"""
        lineages = []
        
        try:
            # Parse the SQL statement
            parsed = sqlparse.parse(sql)[0]
            
            # Extract different types of SQL operations
            if self._is_select_statement(parsed):
                lineages.extend(self._parse_select_lineage(parsed, target_table))
            elif self._is_insert_statement(parsed):
                lineages.extend(self._parse_insert_lineage(parsed, target_table))
            elif self._is_create_table_as(parsed):
                lineages.extend(self._parse_ctas_lineage(parsed, target_table))
                
        except Exception as e:
            # Log parsing errors but don't fail - lineage is often imperfect
            print(f"SQL parsing error: {e}")
            
        return lineages
    
    def _parse_select_lineage(self, parsed_sql: Statement, target_table: str) -> List[ColumnLineage]:
        """Parse SELECT statements to understand column relationships"""
        lineages = []
        
        # Extract SELECT clause to understand output columns
        select_items = self._extract_select_items(parsed_sql)
        
        # Extract FROM and JOIN clauses to understand source tables
        source_tables = self._extract_source_tables(parsed_sql)
        
        # Match output columns to their sources
        for idx, select_item in enumerate(select_items):
            column_name = select_item.get('alias') or select_item.get('column')
            
            if select_item['type'] == 'direct_column':
                # Direct column reference: SELECT customer_id FROM customers
                source_table = select_item.get('table') or source_tables[0]
                lineages.append(ColumnLineage(
                    source_table=source_table,
                    source_column=select_item['column'],
                    target_table=target_table,
                    target_column=column_name,
                    transformation_type="direct_copy",
                    transformation_logic=None,
                    confidence_score=0.95
                ))
                
            elif select_item['type'] == 'function':
                # Function-based transformation: SELECT SUM(amount) as total_amount
                source_columns = select_item.get('source_columns', [])
                for source_col_info in source_columns:
                    lineages.append(ColumnLineage(
                        source_table=source_col_info['table'],
                        source_column=source_col_info['column'],
                        target_table=target_table,
                        target_column=column_name,
                        transformation_type="aggregation" if select_item['function'] in ['SUM', 'COUNT', 'AVG', 'MAX', 'MIN'] else "calculation",
                        transformation_logic=select_item['expression'],
                        confidence_score=0.85
                    ))
                    
            elif select_item['type'] == 'expression':
                # Complex expression: SELECT price * quantity as total_value
                source_columns = self._extract_columns_from_expression(select_item['expression'])
                for source_col in source_columns:
                    # Determine source table (simplified logic)
                    source_table = self._resolve_table_for_column(source_col, source_tables)
                    lineages.append(ColumnLineage(
                        source_table=source_table,
                        source_column=source_col,
                        target_table=target_table,
                        target_column=column_name,
                        transformation_type="calculation",
                        transformation_logic=select_item['expression'],
                        confidence_score=0.75  # Lower confidence for complex expressions
                    ))
        
        return lineages
    
    def _extract_select_items(self, parsed_sql: Statement) -> List[Dict]:
        """Extract column selections with their types and transformations"""
        select_items = []
        
        # This is a simplified version - production would need more robust SQL parsing
        # Consider using a more sophisticated SQL parser like sqlglot or sqllineage
        
        tokens = [token for token in parsed_sql.flatten() if not token.is_whitespace]
        in_select = False
        current_item = ""
        
        for token in tokens:
            if token.ttype is None and token.value.upper() == 'SELECT':
                in_select = True
                continue
            elif token.ttype is None and token.value.upper() in ['FROM', 'WHERE', 'GROUP', 'ORDER']:
                in_select = False
                if current_item.strip():
                    select_items.append(self._parse_select_item(current_item.strip()))
                    current_item = ""
                break
            elif in_select:
                if str(token) == ',':
                    if current_item.strip():
                        select_items.append(self._parse_select_item(current_item.strip()))
                        current_item = ""
                else:
                    current_item += str(token)
        
        # Handle last item
        if current_item.strip():
            select_items.append(self._parse_select_item(current_item.strip()))
            
        return select_items
    
    def _parse_select_item(self, item: str) -> Dict:
        """Parse individual SELECT item to understand its type and sources"""
        item = item.strip()
        
        # Check for alias (AS keyword or space-separated)
        alias = None
        if ' AS ' in item.upper():
            parts = item.upper().split(' AS ')
            expression = parts[0].strip()
            alias = parts[1].strip()
        elif ' ' in item and not any(func in item.upper() for func in ['SUM(', 'COUNT(', 'AVG(', 'CASE ']):
            # Simple space-separated alias
            parts = item.split()
            if len(parts) == 2:
                expression = parts[0]
                alias = parts[1]
            else:
                expression = item
        else:
            expression = item
            
        # Determine type of selection
        if '(' in expression and any(func in expression.upper() for func in ['SUM(', 'COUNT(', 'AVG(', 'MAX(', 'MIN(']):
            return {
                'type': 'function',
                'expression': expression,
                'function': expression.split('(')[0].upper(),
                'alias': alias,
                'column': alias or expression,
                'source_columns': self._extract_columns_from_expression(expression)
            }
        elif any(op in expression for op in ['+', '-', '*', '/', 'CASE', 'COALESCE']):
            return {
                'type': 'expression',
                'expression': expression,
                'alias': alias,
                'column': alias or expression
            }
        else:
            # Simple column reference
            if '.' in expression:
                table, column = expression.split('.', 1)
                return {
                    'type': 'direct_column',
                    'table': table,
                    'column': column,
                    'alias': alias,
                    'expression': expression
                }
            else:
                return {
                    'type': 'direct_column',
                    'column': expression,
                    'alias': alias,
                    'expression': expression
                }

class LineageTracker:
    """Main lineage tracking system with impact analysis capabilities"""
    
    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.lineage_graph = nx.DiGraph()
        self.sql_parser = SQLLineageParser()
        
    async def track_transformation_event(self, 
                                       sql: str,
                                       target_table: str,
                                       execution_context: Dict[str, Any]) -> LineageEvent:
        """Track a transformation event and update lineage graph"""
        
        # Parse SQL to extract column lineage
        column_lineages = self.sql_parser.parse_sql_lineage(sql, target_table)
        
        # Create lineage event
        event = LineageEvent(
            event_id=f"transform_{datetime.now().isoformat()}_{hash(sql)}",
            event_type=LineageEventType.TRANSFORMATION,
            source_system=execution_context.get('pipeline_name', 'unknown'),
            execution_id=execution_context.get('execution_id', ''),
            affected_assets=[target_table] + list(set(cl.source_table for cl in column_lineages)),
            column_lineage=column_lineages,
            metadata={
                'sql': sql,
                'execution_time': execution_context.get('execution_time'),
                'row_count': execution_context.get('row_count')
            }
        )
        
        # Update the lineage graph
        await self._update_lineage_graph(event)
        
        # Store the event
        await self.storage.store_lineage_event(event)
        
        return event
    
    async def _update_lineage_graph(self, event: LineageEvent):
        """Update the in-memory lineage graph with new relationships"""
        
        for col_lineage in event.column_lineage:
            # Add nodes for tables if they don't exist
            source_node = f"{col_lineage.source_table}.{col_lineage.source_column}"
            target_node = f"{col_lineage.target_table}.{col_lineage.target_column}"
            
            self.lineage_graph.add_node(source_node, 
                                      table=col_lineage.source_table,
                                      column=col_lineage.source_column)
            self.lineage_graph.add_node(target_node,
                                      table=col_lineage.target_table, 
                                      column=col_lineage.target_column)
            
            # Add edge with transformation metadata
            self.lineage_graph.add_edge(source_node, target_node,
                                      transformation_type=col_lineage.transformation_type,
                                      transformation_logic=col_lineage.transformation_logic,
                                      confidence=col_lineage.confidence_score,
                                      event_id=event.event_id,
                                      created_at=col_lineage.created_at)
    
    async def get_impact_analysis(self, 
                                table_name: str, 
                                column_name: Optional[str] = None) -> Dict[str, Any]:
        """Analyze downstream impact of changes to a table or column"""
        
        # Define the starting point for impact analysis
        if column_name:
            start_node = f"{table_name}.{column_name}"
        else:
            # For table-level analysis, find all columns in the table
            start_nodes = [node for node in self.lineage_graph.nodes() 
                          if self.lineage_graph.nodes[node].get('table') == table_name]
        
        downstream_impact = {
            'directly_affected': [],
            'indirectly_affected': [],
            'impact_radius': 0,
            'confidence_scores': [],
            'affected_systems': set(),
            'transformation_types': {}
        }
        
        if column_name:
            start_nodes = [start_node] if start_node in self.lineage_graph else []
        
        for start_node in start_nodes:
            if start_node not in self.lineage_graph:
                continue
                
            # Find all downstream nodes using DFS
            downstream_nodes = list(nx.descendants(self.lineage_graph, start_node))
            
            # Categorize impact by distance
            for node in downstream_nodes:
                distance = nx.shortest_path_length(self.lineage_graph, start_node, node)
                node_info = {
                    'table': self.lineage_graph.nodes[node]['table'],
                    'column': self.lineage_graph.nodes[node]['column'],
                    'distance': distance
                }
                
                if distance == 1:
                    downstream_impact['directly_affected'].append(node_info)
                else:
                    downstream_impact['indirectly_affected'].append(node_info)
                
                # Track confidence and transformation types along the path
                path = nx.shortest_path(self.lineage_graph, start_node, node)
                path_confidence = 1.0
                transformation_types = []
                
                for i in range(len(path) - 1):
                    edge_data = self.lineage_graph[path[i]][path[i+1]]
                    path_confidence *= edge_data.get('confidence', 1.0)
                    transformation_types.append(edge_data.get('transformation_type', 'unknown'))
                
                downstream_impact['confidence_scores'].append(path_confidence)
                downstream_impact['transformation_types'][node] = transformation_types
                
                # Track affected systems
                table_parts = node_info['table'].split('.')
                if len(table_parts) > 1:
                    downstream_impact['affected_systems'].add(table_parts[0])  # Database/schema
        
        downstream_impact['impact_radius'] = len(downstream_impact['directly_affected']) + len(downstream_impact['indirectly_affected'])
        downstream_impact['affected_systems'] = list(downstream_impact['affected_systems'])
        
        return downstream_impact
    
    async def get_root_cause_analysis(self, 
                                    table_name: str, 
                                    column_name: Optional[str] = None) -> Dict[str, Any]:
        """Trace upstream dependencies to understand data sources"""
        
        if column_name:
            target_node = f"{table_name}.{column_name}"
            target_nodes = [target_node] if target_node in self.lineage_graph else []
        else:
            target_nodes = [node for node in self.lineage_graph.nodes()
                          if self.lineage_graph.nodes[node].get('table') == table_name]
        
        root_cause_info = {
            'direct_sources': [],
            'ultimate_sources': [],
            'source_systems': set(),
            'transformation_chain': {},
            'data_freshness_risk': []
        }
        
        for target_node in target_nodes:
            if target_node not in self.lineage_graph:
                continue
                
            # Find all upstream nodes
            upstream_nodes = list(nx.ancestors(self.lineage_graph, target_node))
            
            # Find ultimate sources (nodes with no upstream dependencies)
            ultimate_sources = [node for node in upstream_nodes 
                              if self.lineage_graph.in_degree(node) == 0]
            
            # Find direct sources (immediate upstream)
            direct_sources = list(self.lineage_graph.predecessors(target_node))
            
            for source in direct_sources:
                source_info = {
                    'table': self.lineage_graph.nodes[source]['table'],
                    'column': self.lineage_graph.nodes[source]['column'],
                    'transformation_type': self.lineage_graph[source][target_node].get('transformation_type')
                }
                root_cause_info['direct_sources'].append(source_info)
            
            for source in ultimate_sources:
                source_info = {
                    'table': self.lineage_graph.nodes[source]['table'], 
                    'column': self.lineage_graph.nodes[source]['column']
                }
                root_cause_info['ultimate_sources'].append(source_info)
                
                # Track source systems
                table_parts = source_info['table'].split('.')
                if len(table_parts) > 1:
                    root_cause_info['source_systems'].add(table_parts[0])
        
        root_cause_info['source_systems'] = list(root_cause_info['source_systems'])
        
        return root_cause_info

This lineage system provides several advanced capabilities:

Column-Level Granularity: Tracks not just table-to-table relationships but specific column transformations, enabling precise impact analysis.

Transformation Context: Stores the actual SQL logic and transformation type, allowing users to understand how data changes as it flows through the pipeline.

Confidence Scoring: Acknowledges that lineage extraction is imperfect, especially from complex SQL, and provides confidence scores to help users understand reliability.

Bidirectional Analysis: Supports both forward impact analysis ("what breaks if I change this?") and backward root cause analysis ("where does this data come from?").

Real-Time Lineage Updates

Static lineage extraction is insufficient for dynamic environments. You need systems that can update lineage in real-time as pipelines execute. Here's a pattern for hooking into common orchestration tools:

# real_time_lineage_hooks.py
import asyncio
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime

@dataclass
class PipelineExecution:
    pipeline_id: str
    execution_id: str
    task_id: str
    start_time: datetime
    sql_statements: List[str]
    input_tables: List[str]
    output_tables: List[str]
    execution_context: Dict[str, Any]

class AirflowLineageHook:
    """Hook for integrating lineage tracking with Apache Airflow"""
    
    def __init__(self, lineage_tracker: LineageTracker):
        self.lineage_tracker = lineage_tracker
        
    def on_task_start(self, context: Dict[str, Any]):
        """Called when an Airflow task starts"""
        # Extract lineage-relevant information from Airflow context
        task_instance = context['task_instance']
        dag_run = context['dag_run']
        
        # Register the execution start
        execution = PipelineExecution(
            pipeline_id=f"{dag_run.dag_id}.{task_instance.task_id}",
            execution_id=dag_run.run_id,
            task_id=task_instance.task_id,
            start_time=datetime.now(),
            sql_statements=[],  # Will be populated during execution
            input_tables=[],
            output_tables=[],
            execution_context={
                'dag_id': dag_run.dag_id,
                'task_id': task_instance.task_id,
                'execution_date': dag_run.execution_date.isoformat(),
                'operator': task_instance.operator.__class__.__name__
            }
        )
        
        # Store execution context for later lineage extraction
        self._store_execution_context(execution)
    
    def on_sql_execution(self, sql: str, execution_id: str, affected_tables: List[str]):
        """Called whenever SQL is executed within a task"""
        asyncio.create_task(self._track_sql_lineage(sql, execution_id, affected_tables))
    
    async def _track_sql_lineage(self, sql: str, execution_id: str, affected_tables: List[str]):
        """Extract and track lineage from SQL execution"""
        for target_table in affected_tables:
            try:
                await self.lineage_tracker.track_transformation_event(
                    sql=sql,
                    target_table=target_table,
                    execution_context={'execution_id': execution_id}
                )
            except Exception as e:
                # Log but don't fail the pipeline
                print(f"Lineage tracking failed for {target_table}: {e}")

class DBTLineageIntegration:
    """Integration with dbt for automatic lineage extraction"""
    
    def __init__(self, lineage_tracker: LineageTracker):
        self.lineage_tracker = lineage_tracker
        
    async def sync_dbt_lineage(self, manifest_path: str, run_results_path: str):
        """Extract lineage from dbt manifest and run results"""
        import json
        
        # Load dbt artifacts
        with open(manifest_path, 'r') as f:
            manifest = json.load(f)
        with open(run_results_path, 'r') as f:
            run_results = json.load(f)
            
        # Process each model/seed/snapshot
        for node_id, node in manifest['nodes'].items():
            if node['resource_type'] in ['model', 'seed', 'snapshot']:
                await self._process_dbt_node(node, manifest, run_results)
    
    async def _process_dbt_node(self, node: Dict, manifest: Dict, run_results: Dict):
        """Process individual dbt node for lineage extraction"""
        
        # Extract target table information
        target_table = f"{node['database']}.{node['schema']}.{node['name']}"
        
        # Get the compiled SQL
        compiled_sql = node.get('compiled_sql', node.get('raw_sql', ''))
        
        # Extract execution metadata from run results
        execution_context = {
            'dbt_project': manifest['metadata']['project_name'],
            'dbt_version': manifest['metadata']['dbt_version'],
            'model_name': node['name'],
            'materialization': node.get('config', {}).get('materialized', 'table')
        }
        
        # Find execution results for this node
        for result in run_results.get('results', []):
            if result['unique_id'] == node['unique_id']:
                execution_context.update({
                    'execution_time': result.get('execution_time'),
                    'rows_affected': result.get('adapter_response', {}).get('rows_affected'),
                    'status': result.get('status')
                })
                break
        
        # Track the transformation
        if compiled_sql:
            await self.lineage_tracker.track_transformation_event(
                sql=compiled_sql,
                target_table=target_table,
                execution_context=execution_context
            )

This real-time integration ensures that lineage information stays current with your actual data pipelines, rather than becoming stale documentation.

Fine-Grained Access Control Architecture

Beyond Role-Based Access Control

Traditional RBAC systems are too coarse for modern data governance. You need attribute-based access control (ABAC) that can make decisions based on data content, user context, query patterns, and dynamic conditions. Here's an advanced access control system:

# advanced_access_control.py
from typing import Dict, List, Set, Optional, Any, Union
from dataclasses import dataclass, field
from enum import Enum
import asyncio
from datetime import datetime, timedelta
import json
import re
from abc import ABC, abstractmethod

class AccessDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    ALLOW_WITH_MASKING = "allow_with_masking"
    ALLOW_WITH_AUDIT = "allow_with_audit"

class DataClassification(Enum):
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"

@dataclass
class AccessContext:
    """Rich context for access control decisions"""
    user_id: str
    user_roles: Set[str]
    user_departments: Set[str]
    user_clearance_level: str
    query_type: str  # SELECT, INSERT, UPDATE, DELETE
    query_purpose: Optional[str]  # analytics, reporting, ml_training, etc.
    access_location: str  # IP address or geographic location
    access_time: datetime
    client_application: str
    requested_columns: List[str]
    query_filters: Dict[str, Any]  # WHERE clause conditions
    aggregation_level: Optional[str]  # individual_records, aggregated, statistical
    data_retention_context: Optional[Dict[str, Any]]

@dataclass
class DataPolicy:
    """Defines access rules for specific data assets"""
    policy_id: str
    name: str
    description: str
    target_resources: List[str]  # Tables, columns, or datasets
    data_classification: DataClassification
    allowed_roles: Set[str]
    allowed_departments: Set[str]
    required_clearance_level: Optional[str]
    time_restrictions: Optional[Dict[str, Any]]  # Business hours, etc.
    location_restrictions: Optional[List[str]]
    purpose_restrictions: Optional[Set[str]]
    masking_rules: Optional[Dict[str, str]]
    audit_requirements: Dict[str, bool]
    expires_at: Optional[datetime]
    conditions: List['PolicyCondition'] = field(default_factory=list)

@dataclass 
class PolicyCondition:
    """Complex conditions for policy evaluation"""
    condition_type: str  # "row_filter", "column_mask", "time_window", "usage_limit"
    expression: str  # SQL-like expression or Python condition
    parameters: Dict[str, Any]

class AccessControlEngine:
    """Core engine for making access control decisions"""
    
    def __init__(self):
        self.policies: Dict[str, DataPolicy] = {}
        self.user_attributes: Dict[str, Dict[str, Any]] = {}
        self.audit_logger = AuditLogger()
        self.policy_cache = {}  # Cache for expensive policy evaluations
        
    async def evaluate_access(self, 
                            resource: str,
                            access_context: AccessContext) -> Tuple[AccessDecision, Dict[str, Any]]:
        """Main access control decision point"""
        
        decision_context = {
            'resource': resource,
            'user_id': access_context.user_id,
            'timestamp': access_context.access_time,
            'applied_policies': [],
            'masking_applied': {},
            'audit_triggered': False
        }
        
        try:
            # Find applicable policies for this resource
            applicable_policies = await self._find_applicable_policies(resource, access_context)
            
            if not applicable_policies:
                # No specific policies - apply default behavior
                decision_context['applied_policies'] = ['default_deny']
                await self.audit_logger.log_access(access_context, AccessDecision.DENY, decision_context)
                return AccessDecision.DENY, decision_context
            
            # Evaluate each policy (most restrictive wins)
            final_decision = AccessDecision.ALLOW
            masking_rules = {}
            audit_required = False
            
            for policy in applicable_policies:
                policy_decision, policy_context = await self._evaluate_policy(policy, access_context)
                decision_context['applied_policies'].append(policy.policy_id)
                
                # Apply most restrictive decision
                if policy_decision == AccessDecision.DENY:
                    final_decision = AccessDecision.DENY
                    break
                elif policy_decision == AccessDecision.ALLOW_WITH_MASKING:
                    if final_decision == AccessDecision.ALLOW:
                        final_decision = AccessDecision.ALLOW_WITH_MASKING
                    # Collect masking rules
                    if policy.masking_rules:
                        masking_rules.update(policy.masking_rules)
                
                # Check audit requirements
                if policy.audit_requirements.get('log_access', False):
                    audit_required = True
            
            # Apply decision context
            if final_decision == AccessDecision.ALLOW_WITH_MASKING:
                decision_context['masking_applied'] = masking_rules
            if audit_required:
                decision_context['audit_triggered'] = True
                await self.audit_logger.log_access(access_context, final_decision, decision_context)
            
            return final_decision, decision_context
            
        except Exception as e:
            # Security-first: deny on error
            decision_context['error'] = str(e)
            await self.audit_logger.log_access(access_context, AccessDecision.DENY, decision_context)
            return AccessDecision.DENY, decision_context
    
    async def _find_applicable_policies(self, 
                                      resource: str, 
                                      access_context: AccessContext) -> List[DataPolicy]:
        """Find all policies that apply to this resource and context"""
        applicable = []
        
        for policy in self.policies.values():
            # Check if policy applies to this resource
            if not self._resource_matches_policy(resource, policy):
                continue
                
            # Check basic role/department requirements
            if policy.allowed_roles and not policy.allowed_roles.intersection(access_context.user_roles):
                continue
            if policy.allowed_departments and not policy.allowed_departments.intersection(access_context.user_departments):
                continue
                
            # Check clearance level
            if policy.required_clearance_level:
                if not self._check_clearance_level(access_context.user_clearance_level, policy.required_clearance_level):
                    continue
            
            # Check time restrictions
            if policy.time_restrictions:
                if not self._check_time_restrictions(access_context.access_time, policy.time_restrictions):
                    continue
            
            # Check location restrictions
            if policy.location_restrictions:
                if access_context.access_location not in policy.location_restrictions:
                    continue
                    
            # Check purpose restrictions
            if policy.purpose_restrictions and access_context.query_purpose:
                if access_context.query_purpose not in policy.purpose_restrictions:
                    continue
            
            # Policy is applicable
            applicable.append(policy)
        
        return applicable
    
    async def _evaluate_policy(self, 
                             policy: DataPolicy, 
                             access_context: AccessContext) -> Tuple[AccessDecision, Dict[str, Any]]:
        """Evaluate a specific policy against the access context"""
        
        policy_context = {
            'policy_id': policy.policy_id,
            'conditions_evaluated': []
        }
        
        # Evaluate complex conditions
        for condition in policy.conditions:
            condition_result = await self._evaluate_condition(condition, access_context)
            policy_context['conditions_evaluated'].append({
                'condition_type': condition.condition_type,
                'result': condition_result
            })
            
            if not condition_result:
                return AccessDecision.DENY, policy_context
        
        # Determine access decision based on policy configuration
        decision = AccessDecision.ALLOW
        
        # Check if masking is required
        if policy.masking_rules and self._requires_masking(access_context, policy):
            decision = AccessDecision.ALLOW_WITH_MASKING
            
        # Check if audit is required
        if policy.audit_requirements.get('log_access', False):
            if decision == AccessDecision.ALLOW:
                decision = AccessDecision.ALLOW_WITH_AUDIT
        
        return decision, policy_context
    
    async def _evaluate_condition(self, 
                                condition: PolicyCondition, 
                                access_context: AccessContext) -> bool:
        """Evaluate complex policy conditions"""
        
        if condition.condition_type == "row_filter":
            # Check if user query includes required WHERE clause conditions
            return self._check_row_filter_condition(condition, access_context)
            
        elif condition.condition_type == "column_access":
            # Check if user is accessing sensitive columns
            return self._check_column_access_condition(condition, access_context)
            
        elif condition.condition_type == "usage_limit":
            # Check rate limiting or usage quotas
            return await self._check_usage_limit_condition(condition, access_context)
            
        elif condition.condition_type == "data_freshness":
            # Only allow access to recent data
            return self._check_data_freshness_condition(condition, access_context)
            
        elif condition.condition_type == "aggregation_required":
            # Require data to be aggregated (no individual records)
            return self._check_aggregation_condition(condition, access_context)
        
        return True  # Unknown condition type - allow by default
    
    def _check_row_filter_condition(self, 
                                   condition: PolicyCondition, 
                                   access_context: AccessContext) -> bool:
        """Check if query includes required row filtering"""
        
        required_filter = condition.expression
        user_filters = access_context.query_filters
        
        # Example: require filtering by user's department
        if "department = current_user_department" in required_filter:
            return "department" in user_filters and user_filters["department"] in access_context.user_departments
        
        # Example: require date filtering for large tables
        if "date_column >= recent_date" in required_filter:
            date_columns = condition.parameters.get('date_columns', [])
            recent_threshold = datetime.now() - timedelta(days=condition.parameters.get('max_days_back', 30))
            
            for date_col in date_columns:
                if date_col in user_filters:
                    filter_date = user_filters[date_col]
                    if isinstance(filter_date, datetime) and filter_date >= recent_threshold:
                        return True
            return False
        
        return True
    
    async def _check_usage_limit_condition(self, 
                                         condition: PolicyCondition, 
                                         access_context: AccessContext) -> bool:
        """Check usage quotas and rate limits"""
        
        limit_type = condition.parameters.get('limit_type', 'requests_per_hour')
        limit_value = condition.parameters.get('limit_value', 100)
        
        # Get user's recent usage from audit logs
        usage_count = await self.audit_logger.get_usage_count(
            user_id=access_context.user_id,
            resource_pattern=condition.parameters.get('resource_pattern', '*'),
            time_window=timedelta(hours=1) if 'hour' in limit_type else timedelta(days=1)
        )
        
        return usage_count < limit_value

class DataMaskingEngine:
    """Handles dynamic data masking based on access control decisions"""
    
    def __init__(self):
        self.masking_functions = {
            'hash': self._hash_mask,
            'partial': self._partial_mask,
            'date_truncate': self._date_truncate_mask,
            'numeric_round': self._numeric_round_mask,
            'null_mask': self._null_mask,
            'synthetic': self._synthetic_mask
        }
    
    async def apply_masking(self, 
                           query: str, 
                           masking_rules: Dict[str, str],
                           access_context: AccessContext) -> str:
        """Modify query to apply data masking"""
        
        masked_query = query
        
        for column, masking_type in masking_rules.items():
            if masking_type in self.masking_functions:
                # Replace column references with masked versions
                mask_function = self.masking_functions[masking_type]
                masked_expression = await mask_function(column, access_context)
                
                # Use regex to replace column references
                column_pattern = rf'\b{re.escape(column)}\b'
                masked_query = re.sub(column_pattern, masked_expression, masked_query)
        
        return masked_query
    
    async def _hash_mask(self, column: str, context: AccessContext) -> str:
        """Replace column with hash for anonymization"""
        return f"SHA2({column}, 256)"
    
    async def _partial_mask(self, column: str, context: AccessContext) -> str:
        """Show only partial data (e.g., first 3 characters)"""
        return f"LEFT({column}, 3) || '***'"
    
    async def _date_truncate_mask(self, column: str, context: AccessContext) -> str:
        """Truncate dates to month/year level"""
        return f"DATE_TRUNC('month', {column})"
    
    async def _numeric_round_mask(self, column: str, context: AccessContext) -> str:
        """Round numeric values to reduce precision"""
        return f"ROUND({column}, -2)"  # Round to nearest hundred
    
    async def _null_mask(self, column: str, context: AccessContext) -> str:
        """Replace with NULL for complete hiding"""
        return "NULL"

class AuditLogger:
    """Comprehensive audit logging for access control decisions"""
    
    def __init__(self, storage_backend):
        self.storage = storage_backend
        
    async def log_access(self, 
                        access_context: AccessContext,
                        decision: AccessDecision,
                        decision_context: Dict[str, Any]):
        """Log access attempt with full context"""
        
        audit_record = {
            'timestamp': datetime.now().isoformat(),
            'user_id': access_context.user_id,
            'resource': decision_context['resource'],
            'decision': decision.value,
            'query_type': access_context.query_type,
            'query_purpose': access_context.query_purpose,
            'access_location': access_context.access_location,
            'client_application': access_context.client_application,
            'applied_policies': decision_context['applied_policies'],
            'masking_applied': decision_context.get('masking_applied', {}),
            'requested_columns': access_context.requested_columns,
            'user_roles': list(access_context.user_roles),
            'user_departments': list(access_context.user_departments),
            'session_id': decision_context.get('session_id'),
            'query_hash': decision_context.get('query_hash')
        }
        
        await self.storage.store_audit_record(audit_record)
        
        # Also send to SIEM if this is a high-risk access
        if self._is_high_risk_access(access_context, decision):
            await self._send_to_siem(audit_record)
    
    async def get_usage_count(self, 
                             user_id: str, 
                             resource_pattern: str,
                             time_window: timedelta) -> int:
        """Get user's recent access count for rate limiting"""
        
        start_time = datetime.now() - time_window
        
        return await self.storage.count_access_records(
            user_id=user_id,
            resource_pattern=resource_pattern,
            start_time=start_time
        )

This access control system provides several advanced capabilities:

Context-Aware Decisions: Makes access control decisions based on rich context including user attributes, query purpose, location, and data characteristics.

Dynamic Masking: Automatically applies data masking transformations based on user permissions and data sensitivity.

Usage Quotas: Implements rate limiting and usage quotas to prevent data exfiltration or system abuse.

Comprehensive Auditing: Logs all access attempts with full context for compliance and security monitoring.

Condition-Based Policies: Supports complex policy conditions like required row filtering, aggregation requirements, and data freshness constraints.

Integration with Query Engines

The access control system needs to integrate with your actual query engines to intercept and modify queries before execution:

# query_interceptor.py
from typing import Dict, Any, Optional, List
import asyncio
import sqlparse
from sqlparse.sql import Statement, Token, TokenList

class QueryInterceptor:
    """Intercepts and modifies queries based on access control decisions"""
    
    def __init__(self, access_control_engine: AccessControlEngine):
        self.access_control = access_control_engine
        self.masking_engine = DataMaskingEngine()
        
    async def process_query(self, 
                           query: str, 
                           user_context: Dict[str, Any],
                           connection_info: Dict[str, Any]) -> Dict[str, Any]:
        """Main entry point for query processing"""
        
        try:
            # Parse the query to understand what's being accessed
            parsed_query = sqlparse.parse(query)[0]
            access_info = self._extract_access_info(parsed_query)
            
            # Build access context
            access_context = AccessContext(
                user_id=user_context['user_id'],
                user_roles=set(user_context['roles']),
                user_departments=set(user_context['departments']),
                user_clearance_level=user_context.get('clearance_level', 'basic'),
                query_type=access_info['query_type'],
                query_purpose=user_context.get('purpose'),
                access_location=connection_info.get('client_ip', 'unknown'),
                access_time=datetime.now(),
                client_application=connection_info.get('application', 'unknown'),
                requested_columns=access_info['columns'],
                query_filters=access_info['filters'],
                aggregation_level=access_info['aggregation_level']
            )
            
            # Check access for each table referenced
            final_decision = AccessDecision.ALLOW
            all_masking_rules = {}
            
            for table in access_info['tables']:
                decision, decision_context = await self.access_control.evaluate_access(
                

Learning Path: Modern Data Stack

Previous

Reverse ETL: Push Warehouse Data Back to Business Tools

Related Articles

Data Engineering⚡ Practitioner

Reverse ETL: Push Warehouse Data Back to Business Tools

24 min
Data Engineering🌱 Foundation

Data Modeling for Analytics: Dimensional Modeling vs One Big Table

13 min
Data Engineering🌱 Foundation

Cloud Data Warehouses: Snowflake vs BigQuery vs Redshift - Complete Comparison Guide

13 min

On this page

  • Prerequisites
  • The Governance Triangle: Catalogs, Lineage, and Access Controls
  • Architecting Enterprise Data Catalogs
  • Beyond Simple Metadata Storage
  • Advanced Catalog Search and Discovery
  • Advanced Data Lineage Architecture
  • Beyond Simple Column-Level Tracking
  • Real-Time Lineage Updates
  • Fine-Grained Access Control Architecture
  • Beyond Role-Based Access Control
  • Integration with Query Engines