
Picture this: your data engineering team just discovered that a critical ML model has been using corrupted customer data for the past six months. The corruption happened during a schema migration, but nobody knew which downstream systems were affected because your data lineage tracking was limited to a few Confluence pages. Meanwhile, your compliance team is asking for an audit trail of who accessed sensitive PII data, but your access controls are scattered across a dozen different systems with no centralized logging.
This scenario plays out daily in organizations that treat data governance as an afterthought. As data systems grow in complexity—with hundreds of datasets, dozens of transformation pipelines, and multiple teams accessing data—the lack of systematic governance becomes a critical bottleneck. You need more than spreadsheets and good intentions; you need production-grade systems that automatically track, control, and document your data ecosystem.
By the end of this lesson, you'll know how to architect and implement a comprehensive data governance framework that scales with your organization. You'll understand not just the tools, but the underlying principles and trade-offs that determine whether your governance system becomes a helpful guide or an ignored bureaucracy.
What you'll learn:
This lesson assumes you have experience with:
Modern data governance rests on three interconnected pillars that must work together to be effective. Understanding their relationships is crucial before diving into implementation details.
Data Catalogs serve as the central nervous system, maintaining metadata about every asset in your data ecosystem. They answer "what data exists?" and "how do I use it?" But catalogs are only as good as their metadata quality and discoverability.
Data Lineage provides the circulatory system, tracking how data flows through transformations and dependencies. It answers "where does this data come from?" and "what will break if I change this?" Lineage becomes critical for impact analysis, debugging, and compliance.
Access Controls act as the immune system, determining who can access what data under which circumstances. They answer "who can see this?" and "how do we audit access?" Access controls must be granular enough for security while remaining manageable for operations.
The key insight is that these systems must share metadata and coordinate policies. A catalog that doesn't understand lineage relationships can't provide meaningful impact warnings. Access controls that don't integrate with catalogs become impossible to manage at scale. Lineage tracking that ignores access patterns misses crucial governance signals.
Most organizations start with basic catalog tools that store table schemas and descriptions. This approach fails at scale because it treats metadata as static documentation rather than living system knowledge. Production catalogs need to handle dynamic metadata, automated discovery, and complex relationships between assets.
Let's examine a realistic catalog architecture for a mid-size e-commerce company with multiple data sources:
# catalog-architecture.yaml
catalog_components:
metadata_store:
primary: PostgreSQL cluster
search_index: Elasticsearch
cache: Redis
blob_storage: S3 (for large schemas/samples)
discovery_agents:
- name: warehouse_scanner
targets: [Snowflake, BigQuery]
schedule: "*/30 * * * *" # Every 30 minutes
- name: kafka_scanner
targets: [Kafka topics]
schedule: "*/5 * * * *" # Every 5 minutes
- name: api_scanner
targets: [REST APIs, GraphQL]
schedule: "0 */6 * * *" # Every 6 hours
quality_monitors:
- schema_drift_detection
- data_freshness_tracking
- usage_analytics
- quality_score_calculation
The discovery agents continuously scan your data infrastructure, automatically updating metadata when schemas change, new tables appear, or data quality issues emerge. This eliminates the manual maintenance burden that causes most catalogs to become stale.
Here's how to implement a production-grade discovery agent for data warehouses:
# warehouse_discovery_agent.py
import asyncio
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
from datetime import datetime, timedelta
import sqlalchemy as sa
from sqlalchemy.sql import text
@dataclass
class TableMetadata:
database: str
schema: str
table: str
columns: List[Dict]
row_count: Optional[int]
size_bytes: Optional[int]
last_modified: Optional[datetime]
quality_score: Optional[float]
tags: Set[str]
class WarehouseDiscoveryAgent:
def __init__(self, connection_string: str, catalog_api_url: str):
self.engine = sa.create_engine(connection_string)
self.catalog_api = CatalogAPI(catalog_api_url)
self.last_scan = {} # Track per-table scan timestamps
async def discover_and_sync(self):
"""Main discovery loop with incremental updates"""
try:
# Get list of tables with modification times
current_tables = await self._get_table_list()
# Identify what needs updating
tables_to_scan = self._identify_changes(current_tables)
# Process in batches to avoid overwhelming the warehouse
batch_size = 20
for i in range(0, len(tables_to_scan), batch_size):
batch = tables_to_scan[i:i + batch_size]
await self._process_batch(batch)
# Clean up deleted tables
await self._cleanup_deleted_tables(current_tables)
except Exception as e:
logger.error(f"Discovery failed: {e}")
# Critical: Don't let discovery failures break the catalog
await self._record_discovery_failure(e)
async def _get_table_list(self) -> Dict[str, datetime]:
"""Get all tables with their last modification times"""
query = text("""
SELECT
table_catalog,
table_schema,
table_name,
GREATEST(
COALESCE(last_altered, '1900-01-01'::timestamp),
COALESCE(last_ddl, '1900-01-01'::timestamp)
) as last_modified
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema NOT IN ('INFORMATION_SCHEMA', 'ACCOUNT_USAGE')
""")
result = {}
async with self.engine.connect() as conn:
rows = await conn.execute(query)
for row in rows:
table_key = f"{row.table_catalog}.{row.table_schema}.{row.table_name}"
result[table_key] = row.last_modified
return result
def _identify_changes(self, current_tables: Dict[str, datetime]) -> List[str]:
"""Identify which tables need metadata refresh"""
tables_to_scan = []
for table_key, last_modified in current_tables.items():
# Scan if table is new or modified since last scan
if (table_key not in self.last_scan or
last_modified > self.last_scan[table_key]):
tables_to_scan.append(table_key)
return tables_to_scan
async def _extract_table_metadata(self, table_key: str) -> TableMetadata:
"""Extract comprehensive metadata for a single table"""
db, schema, table = table_key.split('.')
# Get column information with extended metadata
columns_query = text("""
SELECT
column_name,
data_type,
is_nullable,
column_default,
comment,
-- Add statistical information for governance
(SELECT COUNT(DISTINCT column_name)
FROM {db}.{schema}.{table}
WHERE column_name IS NOT NULL) as distinct_count,
(SELECT COUNT(*)
FROM {db}.{schema}.{table}
WHERE column_name IS NULL) as null_count
FROM information_schema.columns
WHERE table_catalog = :db
AND table_schema = :schema
AND table_name = :table
ORDER BY ordinal_position
""".format(db=db, schema=schema, table=table))
# Get table-level statistics
stats_query = text("""
SELECT
COUNT(*) as row_count,
-- Estimate size in bytes (warehouse-specific)
APPROXIMATE_SIZE() as size_bytes
FROM {db}.{schema}.{table}
""".format(db=db, schema=schema, table=table))
async with self.engine.connect() as conn:
# Execute queries concurrently
columns_result, stats_result = await asyncio.gather(
conn.execute(columns_query, {'db': db, 'schema': schema, 'table': table}),
conn.execute(stats_query)
)
columns = [dict(row) for row in columns_result]
stats = dict(stats_result.fetchone())
# Calculate quality score based on metadata completeness
quality_score = self._calculate_quality_score(columns, stats)
# Infer semantic tags from column names and types
tags = self._infer_semantic_tags(columns, table)
return TableMetadata(
database=db,
schema=schema,
table=table,
columns=columns,
row_count=stats['row_count'],
size_bytes=stats['size_bytes'],
last_modified=datetime.now(),
quality_score=quality_score,
tags=tags
)
def _calculate_quality_score(self, columns: List[Dict], stats: Dict) -> float:
"""Calculate data quality score based on multiple factors"""
score_components = []
# Schema completeness (do columns have descriptions?)
documented_columns = sum(1 for col in columns if col.get('comment'))
schema_score = documented_columns / len(columns) if columns else 0
score_components.append(('schema_documentation', schema_score, 0.3))
# Data freshness (based on last modification)
# Implementation depends on your business requirements
freshness_score = 1.0 # Simplified for example
score_components.append(('freshness', freshness_score, 0.2))
# Data completeness (null ratios)
if columns:
avg_null_ratio = sum(col.get('null_count', 0) for col in columns) / (
len(columns) * max(1, stats.get('row_count', 1))
)
completeness_score = max(0, 1 - avg_null_ratio)
score_components.append(('completeness', completeness_score, 0.3))
# Usage score (would integrate with query logs)
usage_score = 0.8 # Placeholder - implement based on query frequency
score_components.append(('usage', usage_score, 0.2))
# Weighted average
total_score = sum(score * weight for _, score, weight in score_components)
return round(total_score, 2)
def _infer_semantic_tags(self, columns: List[Dict], table_name: str) -> Set[str]:
"""Automatically infer semantic tags for governance"""
tags = set()
# Table-level tags from naming conventions
if 'user' in table_name.lower() or 'customer' in table_name.lower():
tags.add('pii_risk')
if 'transaction' in table_name.lower() or 'payment' in table_name.lower():
tags.add('financial')
if 'log' in table_name.lower() or 'event' in table_name.lower():
tags.add('behavioral')
# Column-level tags
pii_patterns = ['email', 'phone', 'ssn', 'address', 'name']
sensitive_patterns = ['salary', 'income', 'credit', 'password']
column_names = [col['column_name'].lower() for col in columns]
if any(pattern in name for name in column_names for pattern in pii_patterns):
tags.add('contains_pii')
if any(pattern in name for name in column_names for pattern in sensitive_patterns):
tags.add('sensitive')
return tags
This discovery agent demonstrates several critical patterns for production catalogs:
Incremental Discovery: Instead of scanning everything every time, it tracks modification timestamps and only processes changed tables. This reduces warehouse load and speeds up discovery.
Quality Scoring: Automatically calculated quality scores help users understand data trustworthiness. The scoring combines multiple factors: documentation completeness, data freshness, null ratios, and usage patterns.
Semantic Tagging: Automatically inferred tags enable governance rules and search functionality. The example shows basic pattern matching, but production systems might use machine learning for more sophisticated classification.
Resilient Error Handling: Discovery failures are recorded but don't break the catalog. This is crucial because discovery agents run continuously in production.
The value of a catalog depends heavily on how easily users can find relevant data. Basic keyword search isn't sufficient when you have thousands of datasets. You need semantic search that understands context, relationships, and user intent.
Here's an advanced search implementation that combines multiple ranking signals:
# catalog_search_engine.py
from typing import List, Dict, Any, Optional
import asyncio
from dataclasses import dataclass
from elasticsearch import AsyncElasticsearch
import numpy as np
from sentence_transformers import SentenceTransformer
@dataclass
class SearchResult:
asset_id: str
asset_type: str
title: str
description: str
score: float
match_reasons: List[str]
related_assets: List[str]
class CatalogSearchEngine:
def __init__(self, es_client: AsyncElasticsearch):
self.es_client = es_client
# Load semantic embedding model for content understanding
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
async def search(self,
query: str,
user_context: Dict[str, Any],
filters: Optional[Dict] = None,
limit: int = 20) -> List[SearchResult]:
"""Multi-signal search combining text, semantic, and behavioral signals"""
# Generate semantic embedding for the query
query_embedding = self.embedding_model.encode(query).tolist()
# Build Elasticsearch query combining multiple strategies
es_query = {
"query": {
"function_score": {
"query": {
"bool": {
"should": [
# Exact matches in titles get highest boost
{
"match_phrase": {
"title": {
"query": query,
"boost": 3.0
}
}
},
# Fuzzy matching in titles
{
"match": {
"title": {
"query": query,
"fuzziness": "AUTO",
"boost": 2.0
}
}
},
# Description matching
{
"match": {
"description": {
"query": query,
"boost": 1.5
}
}
},
# Column name matching (important for technical searches)
{
"nested": {
"path": "columns",
"query": {
"match": {
"columns.name": {
"query": query,
"boost": 1.8
}
}
}
}
},
# Semantic similarity using dense vectors
{
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'description_embedding') + 1.0",
"params": {"query_vector": query_embedding}
}
}
},
# Tag matching (exact match gets high boost)
{
"terms": {
"tags": query.split(),
"boost": 2.5
}
}
],
"minimum_should_match": 1
}
},
"functions": [
# Boost recently accessed assets
{
"filter": {"range": {"last_accessed": {"gte": "now-7d"}}},
"weight": 1.2
},
# Boost high-quality assets
{
"field_value_factor": {
"field": "quality_score",
"factor": 1.5,
"missing": 0.5
}
},
# Boost assets from user's team/department
{
"filter": {
"terms": {
"owner_team": user_context.get("teams", [])
}
},
"weight": 1.3
},
# Personalization: boost assets user has accessed before
{
"filter": {
"terms": {
"asset_id": user_context.get("recent_assets", [])
}
},
"weight": 1.1
}
],
"score_mode": "multiply",
"boost_mode": "multiply"
}
},
"size": limit,
"highlight": {
"fields": {
"title": {},
"description": {},
"columns.name": {}
}
},
"_source": {
"excludes": ["description_embedding"] # Don't return large vectors
}
}
# Apply filters if provided
if filters:
es_query["query"]["function_score"]["query"]["bool"]["filter"] = [
self._build_filter(k, v) for k, v in filters.items()
]
# Execute search
response = await self.es_client.search(
index="data_catalog",
body=es_query
)
# Process results and explain ranking
results = []
for hit in response["hits"]["hits"]:
source = hit["_source"]
# Extract match reasons from highlights and query analysis
match_reasons = self._extract_match_reasons(hit, query)
# Get related assets through lineage
related_assets = await self._get_related_assets(source["asset_id"])
results.append(SearchResult(
asset_id=source["asset_id"],
asset_type=source["asset_type"],
title=source["title"],
description=source["description"],
score=hit["_score"],
match_reasons=match_reasons,
related_assets=related_assets
))
return results
def _extract_match_reasons(self, hit: Dict, query: str) -> List[str]:
"""Explain why this result matched - crucial for user trust"""
reasons = []
if "highlight" in hit:
highlights = hit["highlight"]
if "title" in highlights:
reasons.append("Title contains your search terms")
if "description" in highlights:
reasons.append("Description mentions relevant concepts")
if "columns.name" in highlights:
reasons.append("Contains matching column names")
# Analyze score components (simplified)
score = hit["_score"]
source = hit["_source"]
if source.get("quality_score", 0) > 0.8:
reasons.append("High quality data")
if source.get("last_accessed") and self._is_recent(source["last_accessed"]):
reasons.append("Recently accessed by others")
return reasons
async def _get_related_assets(self, asset_id: str, limit: int = 5) -> List[str]:
"""Find related assets through lineage relationships"""
lineage_query = {
"query": {
"bool": {
"should": [
{"term": {"upstream_assets": asset_id}},
{"term": {"downstream_assets": asset_id}}
]
}
},
"size": limit,
"_source": ["asset_id", "title"]
}
response = await self.es_client.search(
index="data_catalog",
body=lineage_query
)
return [hit["_source"]["asset_id"] for hit in response["hits"]["hits"]]
This search engine demonstrates several advanced patterns:
Multi-Signal Ranking: Combines exact text matching, fuzzy matching, semantic similarity, and behavioral signals (usage, quality, personalization) to provide relevant results.
Explainable Results: The match_reasons field helps users understand why results were returned, building trust in the search system.
Contextual Personalization: Results are boosted based on the user's team, department, and historical usage patterns.
Semantic Understanding: Uses sentence transformers to find conceptually similar content even when exact keywords don't match.
Most lineage tools focus on table-to-table relationships, but production systems need much more granular tracking. You need to understand not just that Table A feeds Table B, but which specific columns influence which calculations, how transformations change data semantics, and what the impact radius is for any given change.
Here's a production lineage system architecture that handles complex transformation logic:
# advanced_lineage_tracker.py
from typing import Dict, List, Set, Optional, Union, Any
from dataclasses import dataclass, field
from enum import Enum
import sqlparse
from sqlparse.sql import Statement, Identifier, Function
from sqlparse.tokens import Token
import networkx as nx
from datetime import datetime
import json
class LineageEventType(Enum):
TABLE_CREATE = "table_create"
TABLE_UPDATE = "table_update"
TABLE_DELETE = "table_delete"
COLUMN_ADD = "column_add"
COLUMN_DROP = "column_drop"
TRANSFORMATION = "transformation"
DATA_COPY = "data_copy"
@dataclass
class ColumnLineage:
"""Tracks lineage at the column level with transformation context"""
source_table: str
source_column: str
target_table: str
target_column: str
transformation_type: str # "direct_copy", "aggregation", "calculation", "join"
transformation_logic: Optional[str] # SQL expression or description
confidence_score: float # How certain we are about this lineage
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class LineageEvent:
"""Represents a single lineage-affecting event"""
event_id: str
event_type: LineageEventType
source_system: str
execution_id: str # Link to pipeline run that caused this event
affected_assets: List[str]
column_lineage: List[ColumnLineage]
metadata: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
class SQLLineageParser:
"""Parses SQL to extract column-level lineage relationships"""
def __init__(self):
self.lineage_graph = nx.DiGraph()
def parse_sql_lineage(self, sql: str, target_table: str) -> List[ColumnLineage]:
"""Extract column lineage from SQL statements"""
lineages = []
try:
# Parse the SQL statement
parsed = sqlparse.parse(sql)[0]
# Extract different types of SQL operations
if self._is_select_statement(parsed):
lineages.extend(self._parse_select_lineage(parsed, target_table))
elif self._is_insert_statement(parsed):
lineages.extend(self._parse_insert_lineage(parsed, target_table))
elif self._is_create_table_as(parsed):
lineages.extend(self._parse_ctas_lineage(parsed, target_table))
except Exception as e:
# Log parsing errors but don't fail - lineage is often imperfect
print(f"SQL parsing error: {e}")
return lineages
def _parse_select_lineage(self, parsed_sql: Statement, target_table: str) -> List[ColumnLineage]:
"""Parse SELECT statements to understand column relationships"""
lineages = []
# Extract SELECT clause to understand output columns
select_items = self._extract_select_items(parsed_sql)
# Extract FROM and JOIN clauses to understand source tables
source_tables = self._extract_source_tables(parsed_sql)
# Match output columns to their sources
for idx, select_item in enumerate(select_items):
column_name = select_item.get('alias') or select_item.get('column')
if select_item['type'] == 'direct_column':
# Direct column reference: SELECT customer_id FROM customers
source_table = select_item.get('table') or source_tables[0]
lineages.append(ColumnLineage(
source_table=source_table,
source_column=select_item['column'],
target_table=target_table,
target_column=column_name,
transformation_type="direct_copy",
transformation_logic=None,
confidence_score=0.95
))
elif select_item['type'] == 'function':
# Function-based transformation: SELECT SUM(amount) as total_amount
source_columns = select_item.get('source_columns', [])
for source_col_info in source_columns:
lineages.append(ColumnLineage(
source_table=source_col_info['table'],
source_column=source_col_info['column'],
target_table=target_table,
target_column=column_name,
transformation_type="aggregation" if select_item['function'] in ['SUM', 'COUNT', 'AVG', 'MAX', 'MIN'] else "calculation",
transformation_logic=select_item['expression'],
confidence_score=0.85
))
elif select_item['type'] == 'expression':
# Complex expression: SELECT price * quantity as total_value
source_columns = self._extract_columns_from_expression(select_item['expression'])
for source_col in source_columns:
# Determine source table (simplified logic)
source_table = self._resolve_table_for_column(source_col, source_tables)
lineages.append(ColumnLineage(
source_table=source_table,
source_column=source_col,
target_table=target_table,
target_column=column_name,
transformation_type="calculation",
transformation_logic=select_item['expression'],
confidence_score=0.75 # Lower confidence for complex expressions
))
return lineages
def _extract_select_items(self, parsed_sql: Statement) -> List[Dict]:
"""Extract column selections with their types and transformations"""
select_items = []
# This is a simplified version - production would need more robust SQL parsing
# Consider using a more sophisticated SQL parser like sqlglot or sqllineage
tokens = [token for token in parsed_sql.flatten() if not token.is_whitespace]
in_select = False
current_item = ""
for token in tokens:
if token.ttype is None and token.value.upper() == 'SELECT':
in_select = True
continue
elif token.ttype is None and token.value.upper() in ['FROM', 'WHERE', 'GROUP', 'ORDER']:
in_select = False
if current_item.strip():
select_items.append(self._parse_select_item(current_item.strip()))
current_item = ""
break
elif in_select:
if str(token) == ',':
if current_item.strip():
select_items.append(self._parse_select_item(current_item.strip()))
current_item = ""
else:
current_item += str(token)
# Handle last item
if current_item.strip():
select_items.append(self._parse_select_item(current_item.strip()))
return select_items
def _parse_select_item(self, item: str) -> Dict:
"""Parse individual SELECT item to understand its type and sources"""
item = item.strip()
# Check for alias (AS keyword or space-separated)
alias = None
if ' AS ' in item.upper():
parts = item.upper().split(' AS ')
expression = parts[0].strip()
alias = parts[1].strip()
elif ' ' in item and not any(func in item.upper() for func in ['SUM(', 'COUNT(', 'AVG(', 'CASE ']):
# Simple space-separated alias
parts = item.split()
if len(parts) == 2:
expression = parts[0]
alias = parts[1]
else:
expression = item
else:
expression = item
# Determine type of selection
if '(' in expression and any(func in expression.upper() for func in ['SUM(', 'COUNT(', 'AVG(', 'MAX(', 'MIN(']):
return {
'type': 'function',
'expression': expression,
'function': expression.split('(')[0].upper(),
'alias': alias,
'column': alias or expression,
'source_columns': self._extract_columns_from_expression(expression)
}
elif any(op in expression for op in ['+', '-', '*', '/', 'CASE', 'COALESCE']):
return {
'type': 'expression',
'expression': expression,
'alias': alias,
'column': alias or expression
}
else:
# Simple column reference
if '.' in expression:
table, column = expression.split('.', 1)
return {
'type': 'direct_column',
'table': table,
'column': column,
'alias': alias,
'expression': expression
}
else:
return {
'type': 'direct_column',
'column': expression,
'alias': alias,
'expression': expression
}
class LineageTracker:
"""Main lineage tracking system with impact analysis capabilities"""
def __init__(self, storage_backend):
self.storage = storage_backend
self.lineage_graph = nx.DiGraph()
self.sql_parser = SQLLineageParser()
async def track_transformation_event(self,
sql: str,
target_table: str,
execution_context: Dict[str, Any]) -> LineageEvent:
"""Track a transformation event and update lineage graph"""
# Parse SQL to extract column lineage
column_lineages = self.sql_parser.parse_sql_lineage(sql, target_table)
# Create lineage event
event = LineageEvent(
event_id=f"transform_{datetime.now().isoformat()}_{hash(sql)}",
event_type=LineageEventType.TRANSFORMATION,
source_system=execution_context.get('pipeline_name', 'unknown'),
execution_id=execution_context.get('execution_id', ''),
affected_assets=[target_table] + list(set(cl.source_table for cl in column_lineages)),
column_lineage=column_lineages,
metadata={
'sql': sql,
'execution_time': execution_context.get('execution_time'),
'row_count': execution_context.get('row_count')
}
)
# Update the lineage graph
await self._update_lineage_graph(event)
# Store the event
await self.storage.store_lineage_event(event)
return event
async def _update_lineage_graph(self, event: LineageEvent):
"""Update the in-memory lineage graph with new relationships"""
for col_lineage in event.column_lineage:
# Add nodes for tables if they don't exist
source_node = f"{col_lineage.source_table}.{col_lineage.source_column}"
target_node = f"{col_lineage.target_table}.{col_lineage.target_column}"
self.lineage_graph.add_node(source_node,
table=col_lineage.source_table,
column=col_lineage.source_column)
self.lineage_graph.add_node(target_node,
table=col_lineage.target_table,
column=col_lineage.target_column)
# Add edge with transformation metadata
self.lineage_graph.add_edge(source_node, target_node,
transformation_type=col_lineage.transformation_type,
transformation_logic=col_lineage.transformation_logic,
confidence=col_lineage.confidence_score,
event_id=event.event_id,
created_at=col_lineage.created_at)
async def get_impact_analysis(self,
table_name: str,
column_name: Optional[str] = None) -> Dict[str, Any]:
"""Analyze downstream impact of changes to a table or column"""
# Define the starting point for impact analysis
if column_name:
start_node = f"{table_name}.{column_name}"
else:
# For table-level analysis, find all columns in the table
start_nodes = [node for node in self.lineage_graph.nodes()
if self.lineage_graph.nodes[node].get('table') == table_name]
downstream_impact = {
'directly_affected': [],
'indirectly_affected': [],
'impact_radius': 0,
'confidence_scores': [],
'affected_systems': set(),
'transformation_types': {}
}
if column_name:
start_nodes = [start_node] if start_node in self.lineage_graph else []
for start_node in start_nodes:
if start_node not in self.lineage_graph:
continue
# Find all downstream nodes using DFS
downstream_nodes = list(nx.descendants(self.lineage_graph, start_node))
# Categorize impact by distance
for node in downstream_nodes:
distance = nx.shortest_path_length(self.lineage_graph, start_node, node)
node_info = {
'table': self.lineage_graph.nodes[node]['table'],
'column': self.lineage_graph.nodes[node]['column'],
'distance': distance
}
if distance == 1:
downstream_impact['directly_affected'].append(node_info)
else:
downstream_impact['indirectly_affected'].append(node_info)
# Track confidence and transformation types along the path
path = nx.shortest_path(self.lineage_graph, start_node, node)
path_confidence = 1.0
transformation_types = []
for i in range(len(path) - 1):
edge_data = self.lineage_graph[path[i]][path[i+1]]
path_confidence *= edge_data.get('confidence', 1.0)
transformation_types.append(edge_data.get('transformation_type', 'unknown'))
downstream_impact['confidence_scores'].append(path_confidence)
downstream_impact['transformation_types'][node] = transformation_types
# Track affected systems
table_parts = node_info['table'].split('.')
if len(table_parts) > 1:
downstream_impact['affected_systems'].add(table_parts[0]) # Database/schema
downstream_impact['impact_radius'] = len(downstream_impact['directly_affected']) + len(downstream_impact['indirectly_affected'])
downstream_impact['affected_systems'] = list(downstream_impact['affected_systems'])
return downstream_impact
async def get_root_cause_analysis(self,
table_name: str,
column_name: Optional[str] = None) -> Dict[str, Any]:
"""Trace upstream dependencies to understand data sources"""
if column_name:
target_node = f"{table_name}.{column_name}"
target_nodes = [target_node] if target_node in self.lineage_graph else []
else:
target_nodes = [node for node in self.lineage_graph.nodes()
if self.lineage_graph.nodes[node].get('table') == table_name]
root_cause_info = {
'direct_sources': [],
'ultimate_sources': [],
'source_systems': set(),
'transformation_chain': {},
'data_freshness_risk': []
}
for target_node in target_nodes:
if target_node not in self.lineage_graph:
continue
# Find all upstream nodes
upstream_nodes = list(nx.ancestors(self.lineage_graph, target_node))
# Find ultimate sources (nodes with no upstream dependencies)
ultimate_sources = [node for node in upstream_nodes
if self.lineage_graph.in_degree(node) == 0]
# Find direct sources (immediate upstream)
direct_sources = list(self.lineage_graph.predecessors(target_node))
for source in direct_sources:
source_info = {
'table': self.lineage_graph.nodes[source]['table'],
'column': self.lineage_graph.nodes[source]['column'],
'transformation_type': self.lineage_graph[source][target_node].get('transformation_type')
}
root_cause_info['direct_sources'].append(source_info)
for source in ultimate_sources:
source_info = {
'table': self.lineage_graph.nodes[source]['table'],
'column': self.lineage_graph.nodes[source]['column']
}
root_cause_info['ultimate_sources'].append(source_info)
# Track source systems
table_parts = source_info['table'].split('.')
if len(table_parts) > 1:
root_cause_info['source_systems'].add(table_parts[0])
root_cause_info['source_systems'] = list(root_cause_info['source_systems'])
return root_cause_info
This lineage system provides several advanced capabilities:
Column-Level Granularity: Tracks not just table-to-table relationships but specific column transformations, enabling precise impact analysis.
Transformation Context: Stores the actual SQL logic and transformation type, allowing users to understand how data changes as it flows through the pipeline.
Confidence Scoring: Acknowledges that lineage extraction is imperfect, especially from complex SQL, and provides confidence scores to help users understand reliability.
Bidirectional Analysis: Supports both forward impact analysis ("what breaks if I change this?") and backward root cause analysis ("where does this data come from?").
Static lineage extraction is insufficient for dynamic environments. You need systems that can update lineage in real-time as pipelines execute. Here's a pattern for hooking into common orchestration tools:
# real_time_lineage_hooks.py
import asyncio
from typing import Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PipelineExecution:
pipeline_id: str
execution_id: str
task_id: str
start_time: datetime
sql_statements: List[str]
input_tables: List[str]
output_tables: List[str]
execution_context: Dict[str, Any]
class AirflowLineageHook:
"""Hook for integrating lineage tracking with Apache Airflow"""
def __init__(self, lineage_tracker: LineageTracker):
self.lineage_tracker = lineage_tracker
def on_task_start(self, context: Dict[str, Any]):
"""Called when an Airflow task starts"""
# Extract lineage-relevant information from Airflow context
task_instance = context['task_instance']
dag_run = context['dag_run']
# Register the execution start
execution = PipelineExecution(
pipeline_id=f"{dag_run.dag_id}.{task_instance.task_id}",
execution_id=dag_run.run_id,
task_id=task_instance.task_id,
start_time=datetime.now(),
sql_statements=[], # Will be populated during execution
input_tables=[],
output_tables=[],
execution_context={
'dag_id': dag_run.dag_id,
'task_id': task_instance.task_id,
'execution_date': dag_run.execution_date.isoformat(),
'operator': task_instance.operator.__class__.__name__
}
)
# Store execution context for later lineage extraction
self._store_execution_context(execution)
def on_sql_execution(self, sql: str, execution_id: str, affected_tables: List[str]):
"""Called whenever SQL is executed within a task"""
asyncio.create_task(self._track_sql_lineage(sql, execution_id, affected_tables))
async def _track_sql_lineage(self, sql: str, execution_id: str, affected_tables: List[str]):
"""Extract and track lineage from SQL execution"""
for target_table in affected_tables:
try:
await self.lineage_tracker.track_transformation_event(
sql=sql,
target_table=target_table,
execution_context={'execution_id': execution_id}
)
except Exception as e:
# Log but don't fail the pipeline
print(f"Lineage tracking failed for {target_table}: {e}")
class DBTLineageIntegration:
"""Integration with dbt for automatic lineage extraction"""
def __init__(self, lineage_tracker: LineageTracker):
self.lineage_tracker = lineage_tracker
async def sync_dbt_lineage(self, manifest_path: str, run_results_path: str):
"""Extract lineage from dbt manifest and run results"""
import json
# Load dbt artifacts
with open(manifest_path, 'r') as f:
manifest = json.load(f)
with open(run_results_path, 'r') as f:
run_results = json.load(f)
# Process each model/seed/snapshot
for node_id, node in manifest['nodes'].items():
if node['resource_type'] in ['model', 'seed', 'snapshot']:
await self._process_dbt_node(node, manifest, run_results)
async def _process_dbt_node(self, node: Dict, manifest: Dict, run_results: Dict):
"""Process individual dbt node for lineage extraction"""
# Extract target table information
target_table = f"{node['database']}.{node['schema']}.{node['name']}"
# Get the compiled SQL
compiled_sql = node.get('compiled_sql', node.get('raw_sql', ''))
# Extract execution metadata from run results
execution_context = {
'dbt_project': manifest['metadata']['project_name'],
'dbt_version': manifest['metadata']['dbt_version'],
'model_name': node['name'],
'materialization': node.get('config', {}).get('materialized', 'table')
}
# Find execution results for this node
for result in run_results.get('results', []):
if result['unique_id'] == node['unique_id']:
execution_context.update({
'execution_time': result.get('execution_time'),
'rows_affected': result.get('adapter_response', {}).get('rows_affected'),
'status': result.get('status')
})
break
# Track the transformation
if compiled_sql:
await self.lineage_tracker.track_transformation_event(
sql=compiled_sql,
target_table=target_table,
execution_context=execution_context
)
This real-time integration ensures that lineage information stays current with your actual data pipelines, rather than becoming stale documentation.
Traditional RBAC systems are too coarse for modern data governance. You need attribute-based access control (ABAC) that can make decisions based on data content, user context, query patterns, and dynamic conditions. Here's an advanced access control system:
# advanced_access_control.py
from typing import Dict, List, Set, Optional, Any, Union
from dataclasses import dataclass, field
from enum import Enum
import asyncio
from datetime import datetime, timedelta
import json
import re
from abc import ABC, abstractmethod
class AccessDecision(Enum):
ALLOW = "allow"
DENY = "deny"
ALLOW_WITH_MASKING = "allow_with_masking"
ALLOW_WITH_AUDIT = "allow_with_audit"
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class AccessContext:
"""Rich context for access control decisions"""
user_id: str
user_roles: Set[str]
user_departments: Set[str]
user_clearance_level: str
query_type: str # SELECT, INSERT, UPDATE, DELETE
query_purpose: Optional[str] # analytics, reporting, ml_training, etc.
access_location: str # IP address or geographic location
access_time: datetime
client_application: str
requested_columns: List[str]
query_filters: Dict[str, Any] # WHERE clause conditions
aggregation_level: Optional[str] # individual_records, aggregated, statistical
data_retention_context: Optional[Dict[str, Any]]
@dataclass
class DataPolicy:
"""Defines access rules for specific data assets"""
policy_id: str
name: str
description: str
target_resources: List[str] # Tables, columns, or datasets
data_classification: DataClassification
allowed_roles: Set[str]
allowed_departments: Set[str]
required_clearance_level: Optional[str]
time_restrictions: Optional[Dict[str, Any]] # Business hours, etc.
location_restrictions: Optional[List[str]]
purpose_restrictions: Optional[Set[str]]
masking_rules: Optional[Dict[str, str]]
audit_requirements: Dict[str, bool]
expires_at: Optional[datetime]
conditions: List['PolicyCondition'] = field(default_factory=list)
@dataclass
class PolicyCondition:
"""Complex conditions for policy evaluation"""
condition_type: str # "row_filter", "column_mask", "time_window", "usage_limit"
expression: str # SQL-like expression or Python condition
parameters: Dict[str, Any]
class AccessControlEngine:
"""Core engine for making access control decisions"""
def __init__(self):
self.policies: Dict[str, DataPolicy] = {}
self.user_attributes: Dict[str, Dict[str, Any]] = {}
self.audit_logger = AuditLogger()
self.policy_cache = {} # Cache for expensive policy evaluations
async def evaluate_access(self,
resource: str,
access_context: AccessContext) -> Tuple[AccessDecision, Dict[str, Any]]:
"""Main access control decision point"""
decision_context = {
'resource': resource,
'user_id': access_context.user_id,
'timestamp': access_context.access_time,
'applied_policies': [],
'masking_applied': {},
'audit_triggered': False
}
try:
# Find applicable policies for this resource
applicable_policies = await self._find_applicable_policies(resource, access_context)
if not applicable_policies:
# No specific policies - apply default behavior
decision_context['applied_policies'] = ['default_deny']
await self.audit_logger.log_access(access_context, AccessDecision.DENY, decision_context)
return AccessDecision.DENY, decision_context
# Evaluate each policy (most restrictive wins)
final_decision = AccessDecision.ALLOW
masking_rules = {}
audit_required = False
for policy in applicable_policies:
policy_decision, policy_context = await self._evaluate_policy(policy, access_context)
decision_context['applied_policies'].append(policy.policy_id)
# Apply most restrictive decision
if policy_decision == AccessDecision.DENY:
final_decision = AccessDecision.DENY
break
elif policy_decision == AccessDecision.ALLOW_WITH_MASKING:
if final_decision == AccessDecision.ALLOW:
final_decision = AccessDecision.ALLOW_WITH_MASKING
# Collect masking rules
if policy.masking_rules:
masking_rules.update(policy.masking_rules)
# Check audit requirements
if policy.audit_requirements.get('log_access', False):
audit_required = True
# Apply decision context
if final_decision == AccessDecision.ALLOW_WITH_MASKING:
decision_context['masking_applied'] = masking_rules
if audit_required:
decision_context['audit_triggered'] = True
await self.audit_logger.log_access(access_context, final_decision, decision_context)
return final_decision, decision_context
except Exception as e:
# Security-first: deny on error
decision_context['error'] = str(e)
await self.audit_logger.log_access(access_context, AccessDecision.DENY, decision_context)
return AccessDecision.DENY, decision_context
async def _find_applicable_policies(self,
resource: str,
access_context: AccessContext) -> List[DataPolicy]:
"""Find all policies that apply to this resource and context"""
applicable = []
for policy in self.policies.values():
# Check if policy applies to this resource
if not self._resource_matches_policy(resource, policy):
continue
# Check basic role/department requirements
if policy.allowed_roles and not policy.allowed_roles.intersection(access_context.user_roles):
continue
if policy.allowed_departments and not policy.allowed_departments.intersection(access_context.user_departments):
continue
# Check clearance level
if policy.required_clearance_level:
if not self._check_clearance_level(access_context.user_clearance_level, policy.required_clearance_level):
continue
# Check time restrictions
if policy.time_restrictions:
if not self._check_time_restrictions(access_context.access_time, policy.time_restrictions):
continue
# Check location restrictions
if policy.location_restrictions:
if access_context.access_location not in policy.location_restrictions:
continue
# Check purpose restrictions
if policy.purpose_restrictions and access_context.query_purpose:
if access_context.query_purpose not in policy.purpose_restrictions:
continue
# Policy is applicable
applicable.append(policy)
return applicable
async def _evaluate_policy(self,
policy: DataPolicy,
access_context: AccessContext) -> Tuple[AccessDecision, Dict[str, Any]]:
"""Evaluate a specific policy against the access context"""
policy_context = {
'policy_id': policy.policy_id,
'conditions_evaluated': []
}
# Evaluate complex conditions
for condition in policy.conditions:
condition_result = await self._evaluate_condition(condition, access_context)
policy_context['conditions_evaluated'].append({
'condition_type': condition.condition_type,
'result': condition_result
})
if not condition_result:
return AccessDecision.DENY, policy_context
# Determine access decision based on policy configuration
decision = AccessDecision.ALLOW
# Check if masking is required
if policy.masking_rules and self._requires_masking(access_context, policy):
decision = AccessDecision.ALLOW_WITH_MASKING
# Check if audit is required
if policy.audit_requirements.get('log_access', False):
if decision == AccessDecision.ALLOW:
decision = AccessDecision.ALLOW_WITH_AUDIT
return decision, policy_context
async def _evaluate_condition(self,
condition: PolicyCondition,
access_context: AccessContext) -> bool:
"""Evaluate complex policy conditions"""
if condition.condition_type == "row_filter":
# Check if user query includes required WHERE clause conditions
return self._check_row_filter_condition(condition, access_context)
elif condition.condition_type == "column_access":
# Check if user is accessing sensitive columns
return self._check_column_access_condition(condition, access_context)
elif condition.condition_type == "usage_limit":
# Check rate limiting or usage quotas
return await self._check_usage_limit_condition(condition, access_context)
elif condition.condition_type == "data_freshness":
# Only allow access to recent data
return self._check_data_freshness_condition(condition, access_context)
elif condition.condition_type == "aggregation_required":
# Require data to be aggregated (no individual records)
return self._check_aggregation_condition(condition, access_context)
return True # Unknown condition type - allow by default
def _check_row_filter_condition(self,
condition: PolicyCondition,
access_context: AccessContext) -> bool:
"""Check if query includes required row filtering"""
required_filter = condition.expression
user_filters = access_context.query_filters
# Example: require filtering by user's department
if "department = current_user_department" in required_filter:
return "department" in user_filters and user_filters["department"] in access_context.user_departments
# Example: require date filtering for large tables
if "date_column >= recent_date" in required_filter:
date_columns = condition.parameters.get('date_columns', [])
recent_threshold = datetime.now() - timedelta(days=condition.parameters.get('max_days_back', 30))
for date_col in date_columns:
if date_col in user_filters:
filter_date = user_filters[date_col]
if isinstance(filter_date, datetime) and filter_date >= recent_threshold:
return True
return False
return True
async def _check_usage_limit_condition(self,
condition: PolicyCondition,
access_context: AccessContext) -> bool:
"""Check usage quotas and rate limits"""
limit_type = condition.parameters.get('limit_type', 'requests_per_hour')
limit_value = condition.parameters.get('limit_value', 100)
# Get user's recent usage from audit logs
usage_count = await self.audit_logger.get_usage_count(
user_id=access_context.user_id,
resource_pattern=condition.parameters.get('resource_pattern', '*'),
time_window=timedelta(hours=1) if 'hour' in limit_type else timedelta(days=1)
)
return usage_count < limit_value
class DataMaskingEngine:
"""Handles dynamic data masking based on access control decisions"""
def __init__(self):
self.masking_functions = {
'hash': self._hash_mask,
'partial': self._partial_mask,
'date_truncate': self._date_truncate_mask,
'numeric_round': self._numeric_round_mask,
'null_mask': self._null_mask,
'synthetic': self._synthetic_mask
}
async def apply_masking(self,
query: str,
masking_rules: Dict[str, str],
access_context: AccessContext) -> str:
"""Modify query to apply data masking"""
masked_query = query
for column, masking_type in masking_rules.items():
if masking_type in self.masking_functions:
# Replace column references with masked versions
mask_function = self.masking_functions[masking_type]
masked_expression = await mask_function(column, access_context)
# Use regex to replace column references
column_pattern = rf'\b{re.escape(column)}\b'
masked_query = re.sub(column_pattern, masked_expression, masked_query)
return masked_query
async def _hash_mask(self, column: str, context: AccessContext) -> str:
"""Replace column with hash for anonymization"""
return f"SHA2({column}, 256)"
async def _partial_mask(self, column: str, context: AccessContext) -> str:
"""Show only partial data (e.g., first 3 characters)"""
return f"LEFT({column}, 3) || '***'"
async def _date_truncate_mask(self, column: str, context: AccessContext) -> str:
"""Truncate dates to month/year level"""
return f"DATE_TRUNC('month', {column})"
async def _numeric_round_mask(self, column: str, context: AccessContext) -> str:
"""Round numeric values to reduce precision"""
return f"ROUND({column}, -2)" # Round to nearest hundred
async def _null_mask(self, column: str, context: AccessContext) -> str:
"""Replace with NULL for complete hiding"""
return "NULL"
class AuditLogger:
"""Comprehensive audit logging for access control decisions"""
def __init__(self, storage_backend):
self.storage = storage_backend
async def log_access(self,
access_context: AccessContext,
decision: AccessDecision,
decision_context: Dict[str, Any]):
"""Log access attempt with full context"""
audit_record = {
'timestamp': datetime.now().isoformat(),
'user_id': access_context.user_id,
'resource': decision_context['resource'],
'decision': decision.value,
'query_type': access_context.query_type,
'query_purpose': access_context.query_purpose,
'access_location': access_context.access_location,
'client_application': access_context.client_application,
'applied_policies': decision_context['applied_policies'],
'masking_applied': decision_context.get('masking_applied', {}),
'requested_columns': access_context.requested_columns,
'user_roles': list(access_context.user_roles),
'user_departments': list(access_context.user_departments),
'session_id': decision_context.get('session_id'),
'query_hash': decision_context.get('query_hash')
}
await self.storage.store_audit_record(audit_record)
# Also send to SIEM if this is a high-risk access
if self._is_high_risk_access(access_context, decision):
await self._send_to_siem(audit_record)
async def get_usage_count(self,
user_id: str,
resource_pattern: str,
time_window: timedelta) -> int:
"""Get user's recent access count for rate limiting"""
start_time = datetime.now() - time_window
return await self.storage.count_access_records(
user_id=user_id,
resource_pattern=resource_pattern,
start_time=start_time
)
This access control system provides several advanced capabilities:
Context-Aware Decisions: Makes access control decisions based on rich context including user attributes, query purpose, location, and data characteristics.
Dynamic Masking: Automatically applies data masking transformations based on user permissions and data sensitivity.
Usage Quotas: Implements rate limiting and usage quotas to prevent data exfiltration or system abuse.
Comprehensive Auditing: Logs all access attempts with full context for compliance and security monitoring.
Condition-Based Policies: Supports complex policy conditions like required row filtering, aggregation requirements, and data freshness constraints.
The access control system needs to integrate with your actual query engines to intercept and modify queries before execution:
# query_interceptor.py
from typing import Dict, Any, Optional, List
import asyncio
import sqlparse
from sqlparse.sql import Statement, Token, TokenList
class QueryInterceptor:
"""Intercepts and modifies queries based on access control decisions"""
def __init__(self, access_control_engine: AccessControlEngine):
self.access_control = access_control_engine
self.masking_engine = DataMaskingEngine()
async def process_query(self,
query: str,
user_context: Dict[str, Any],
connection_info: Dict[str, Any]) -> Dict[str, Any]:
"""Main entry point for query processing"""
try:
# Parse the query to understand what's being accessed
parsed_query = sqlparse.parse(query)[0]
access_info = self._extract_access_info(parsed_query)
# Build access context
access_context = AccessContext(
user_id=user_context['user_id'],
user_roles=set(user_context['roles']),
user_departments=set(user_context['departments']),
user_clearance_level=user_context.get('clearance_level', 'basic'),
query_type=access_info['query_type'],
query_purpose=user_context.get('purpose'),
access_location=connection_info.get('client_ip', 'unknown'),
access_time=datetime.now(),
client_application=connection_info.get('application', 'unknown'),
requested_columns=access_info['columns'],
query_filters=access_info['filters'],
aggregation_level=access_info['aggregation_level']
)
# Check access for each table referenced
final_decision = AccessDecision.ALLOW
all_masking_rules = {}
for table in access_info['tables']:
decision, decision_context = await self.access_control.evaluate_access(
Learning Path: Modern Data Stack