
Picture this scenario: Your company's data warehouse contains raw event streams from your web application, unprocessed sales data from multiple systems, and customer information scattered across various tables. The analytics team needs clean, reliable datasets for reporting, but every query requires complex joins, aggregations, and business logic. Worse, when someone discovers a bug in the transformation logic, it's replicated across dozens of hand-written SQL scripts with no version control or testing.
This is where dbt (data build tool) revolutionizes your data workflow. dbt transforms your warehouse into a mature software development environment where SQL becomes modular, testable, and maintainable. Instead of writing one-off transformation scripts, you'll build a graph of interdependent models that automatically handle dependencies, provide data lineage, and enable collaborative development.
By the end of this lesson, you'll understand how dbt fundamentally changes data transformation from ad-hoc scripting to engineering discipline. You'll build production-ready transformation pipelines that your entire team can understand, modify, and trust.
What you'll learn:
This lesson assumes you have:
Traditional data workflows suffer from a fundamental problem: they treat data transformation as a series of one-off scripts rather than engineered software. dbt introduces the concept of "analytics engineering" – applying software engineering best practices to data transformation.
The core insight is deceptively simple: most data transformation is just SQL, and SQL is code. If SQL is code, then it should be version controlled, tested, documented, and deployed like any other software. dbt provides the framework to make this happen.
dbt doesn't execute your SQL directly. Instead, it compiles Jinja-templated SQL into pure SQL that runs on your warehouse. This compilation step is where dbt's power emerges.
Consider this raw SQL query that's common in many organizations:
-- Raw SQL approach - brittle and hard to maintain
SELECT
customer_id,
SUM(order_total) as lifetime_value,
COUNT(*) as order_count,
MIN(order_date) as first_order_date,
MAX(order_date) as last_order_date
FROM raw.ecommerce.orders
WHERE order_status IN ('completed', 'shipped', 'delivered')
AND order_date >= '2020-01-01'
GROUP BY customer_id
Here's the same logic expressed as a dbt model:
-- models/marts/customers/customer_lifetime_value.sql
{{
config(
materialized='table',
indexes=[
{'columns': ['customer_id'], 'unique': True}
]
)
}}
WITH completed_orders AS (
SELECT *
FROM {{ ref('staging_orders') }}
WHERE order_status IN {{ var('completed_order_statuses') }}
AND order_date >= {{ var('analysis_start_date') }}
)
SELECT
customer_id,
SUM(order_total) as lifetime_value,
COUNT(*) as order_count,
MIN(order_date) as first_order_date,
MAX(order_date) as last_order_date
FROM completed_orders
GROUP BY customer_id
The dbt version introduces several powerful concepts:
ref() function: Creates dependencies between models. dbt automatically determines build order and handles schema changes upstream.
var() function: Parameterizes your models. Business logic like "what constitutes a completed order" becomes configurable.
config() block: Declares how the model should be materialized and optimized in your warehouse.
When dbt compiles this model, it resolves all template variables and references, producing pure SQL optimized for your specific warehouse.
A well-structured dbt project follows conventions that make it scalable and maintainable. Let's build a realistic project structure for an e-commerce company.
First, initialize your dbt project:
dbt init ecommerce_analytics
cd ecommerce_analytics
Your dbt_project.yml file controls project-wide behavior:
name: 'ecommerce_analytics'
version: '1.0.0'
config-version: 2
profile: 'ecommerce'
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
ecommerce_analytics:
staging:
+materialized: view
+docs:
node_color: "#F7F7F7"
intermediate:
+materialized: ephemeral
+docs:
node_color: "#FFD700"
marts:
+materialized: table
+docs:
node_color: "#7048e8"
vars:
# Business logic variables
completed_order_statuses: ['completed', 'shipped', 'delivered']
analysis_start_date: '2020-01-01'
# Performance tuning variables
incremental_lookback_days: 3
snapshot_strategy: 'timestamp'
# Global test configurations
tests:
+store_failures: true
+severity: warn
# Documentation configuration
docs:
generate: true
include_source_code: true
Professional dbt projects follow a three-layer architecture that separates concerns and promotes reusability:
Staging Layer: Light transformations on raw data – renaming columns, casting types, basic cleaning Intermediate Layer: Business logic transformations that create reusable building blocks Marts Layer: Final datasets optimized for specific use cases
Let's implement this structure:
models/
├── staging/
│ ├── ecommerce/
│ │ ├── _ecommerce__sources.yml
│ │ ├── _ecommerce__models.yml
│ │ ├── stg_ecommerce__customers.sql
│ │ ├── stg_ecommerce__orders.sql
│ │ └── stg_ecommerce__order_items.sql
│ └── marketing/
│ ├── _marketing__sources.yml
│ ├── _marketing__models.yml
│ └── stg_marketing__campaigns.sql
├── intermediate/
│ ├── orders/
│ │ ├── _orders__models.yml
│ │ ├── int_orders__daily_aggregates.sql
│ │ └── int_orders__customer_metrics.sql
│ └── customers/
│ ├── _customers__models.yml
│ └── int_customers__enriched.sql
└── marts/
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ ├── dim_products.sql
│ └── fct_orders.sql
└── marketing/
├── _marketing__models.yml
└── customer_acquisition_metrics.sql
Staging models are your first line of defense against bad data. They should be simple, focused transformations that establish data contracts for downstream models.
-- models/staging/ecommerce/stg_ecommerce__orders.sql
{{
config(
materialized='view',
contract=True
)
}}
WITH source AS (
SELECT * FROM {{ source('ecommerce', 'orders') }}
),
transformed AS (
SELECT
-- Primary key
order_id,
-- Foreign keys
customer_id,
-- Dates
CAST(created_at AS TIMESTAMP) AS order_created_at,
CAST(updated_at AS TIMESTAMP) AS order_updated_at,
DATE(created_at) AS order_date,
-- Measures
CAST(total_amount AS DECIMAL(10,2)) AS order_total,
CAST(tax_amount AS DECIMAL(10,2)) AS tax_amount,
CAST(shipping_amount AS DECIMAL(10,2)) AS shipping_amount,
-- Dimensions
LOWER(TRIM(status)) AS order_status,
LOWER(TRIM(payment_method)) AS payment_method,
-- Metadata
CURRENT_TIMESTAMP() AS _loaded_at
FROM source
WHERE
-- Basic data quality filters
order_id IS NOT NULL
AND customer_id IS NOT NULL
AND created_at IS NOT NULL
AND total_amount >= 0
)
SELECT * FROM transformed
The contract=True configuration enables dbt's data contracts feature, which enforces column data types and constraints at runtime. This catches schema drift early and prevents bad data from propagating downstream.
Intermediate models encapsulate complex business logic into reusable components. They're typically materialized as ephemeral views to avoid storage costs while enabling modular development.
-- models/intermediate/customers/int_customers__order_metrics.sql
{{
config(
materialized='ephemeral'
)
}}
WITH orders AS (
SELECT *
FROM {{ ref('stg_ecommerce__orders') }}
WHERE order_status IN {{ var('completed_order_statuses') }}
),
order_items AS (
SELECT *
FROM {{ ref('stg_ecommerce__order_items') }}
),
enriched_orders AS (
SELECT
o.*,
oi.total_items,
oi.total_quantity
FROM orders o
LEFT JOIN (
SELECT
order_id,
COUNT(*) AS total_items,
SUM(quantity) AS total_quantity
FROM order_items
GROUP BY order_id
) oi ON o.order_id = oi.order_id
),
customer_metrics AS (
SELECT
customer_id,
-- Order counts
COUNT(*) AS total_orders,
COUNT(CASE WHEN order_date >= CURRENT_DATE - INTERVAL '90 days'
THEN 1 END) AS orders_90d,
COUNT(CASE WHEN order_date >= CURRENT_DATE - INTERVAL '365 days'
THEN 1 END) AS orders_365d,
-- Revenue metrics
SUM(order_total) AS lifetime_value,
AVG(order_total) AS avg_order_value,
MEDIAN(order_total) AS median_order_value,
-- Timing metrics
MIN(order_date) AS first_order_date,
MAX(order_date) AS most_recent_order_date,
DATEDIFF('day', MIN(order_date), MAX(order_date)) AS customer_lifespan_days,
-- Advanced metrics
DATEDIFF('day', MAX(order_date), CURRENT_DATE) AS days_since_last_order,
COUNT(*) / NULLIF(
DATEDIFF('day', MIN(order_date), CURRENT_DATE), 0
) * 365 AS orders_per_year,
-- Product diversity
AVG(total_items) AS avg_items_per_order,
AVG(total_quantity) AS avg_quantity_per_order
FROM enriched_orders
GROUP BY customer_id
)
SELECT * FROM customer_metrics
Marts are your final, production-ready datasets. They should be optimized for query performance and designed around specific business use cases.
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
indexes=[
{'columns': ['customer_id'], 'unique': True},
{'columns': ['customer_segment']},
{'columns': ['first_order_date']}
],
cluster_by=['customer_segment', 'first_order_date']
)
}}
WITH customers AS (
SELECT * FROM {{ ref('stg_ecommerce__customers') }}
),
order_metrics AS (
SELECT * FROM {{ ref('int_customers__order_metrics') }}
),
enriched AS (
SELECT
-- Primary key and identifiers
c.customer_id,
c.email,
c.first_name,
c.last_name,
c.first_name || ' ' || c.last_name AS full_name,
-- Demographics
c.date_of_birth,
c.gender,
c.city,
c.state,
c.country,
c.customer_created_at,
-- Order metrics
COALESCE(om.total_orders, 0) AS total_orders,
COALESCE(om.orders_90d, 0) AS orders_90d,
COALESCE(om.orders_365d, 0) AS orders_365d,
COALESCE(om.lifetime_value, 0) AS lifetime_value,
om.avg_order_value,
om.median_order_value,
om.first_order_date,
om.most_recent_order_date,
om.customer_lifespan_days,
om.days_since_last_order,
om.orders_per_year,
om.avg_items_per_order,
om.avg_quantity_per_order,
-- Business segments
CASE
WHEN om.total_orders IS NULL THEN 'Never Purchased'
WHEN om.days_since_last_order > 365 THEN 'Dormant'
WHEN om.days_since_last_order > 90 THEN 'At Risk'
WHEN om.orders_90d >= 3 THEN 'Champion'
WHEN om.lifetime_value > 1000 THEN 'High Value'
WHEN om.total_orders = 1 THEN 'New Customer'
ELSE 'Regular'
END AS customer_segment,
CASE
WHEN om.lifetime_value >= 2000 THEN 'High'
WHEN om.lifetime_value >= 500 THEN 'Medium'
WHEN om.lifetime_value > 0 THEN 'Low'
ELSE 'None'
END AS value_tier,
-- Metadata
CURRENT_TIMESTAMP() AS _updated_at
FROM customers c
LEFT JOIN order_metrics om ON c.customer_id = om.customer_id
)
SELECT * FROM enriched
For large datasets, full-refresh models become impractical. Incremental models process only new or changed records, dramatically improving performance and reducing warehouse costs.
dbt supports several incremental strategies:
Here's a sophisticated incremental model that handles late-arriving data:
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
merge_update_columns=['order_status', 'order_updated_at', 'order_total'],
on_schema_change='append_new_columns',
cluster_by=['order_date'],
partition_by={
'field': 'order_date',
'data_type': 'date',
'granularity': 'day'
}
)
}}
WITH orders AS (
SELECT *
FROM {{ ref('stg_ecommerce__orders') }}
{% if is_incremental() %}
-- Handle late-arriving data by looking back N days
WHERE order_updated_at >= (
SELECT DATEADD('day', -{{ var('incremental_lookback_days') }}, MAX(order_updated_at))
FROM {{ this }}
)
{% endif %}
),
order_items AS (
SELECT
order_id,
COUNT(*) AS item_count,
SUM(quantity) AS total_quantity,
SUM(line_total) AS items_total,
AVG(unit_price) AS avg_unit_price
FROM {{ ref('stg_ecommerce__order_items') }}
{% if is_incremental() %}
WHERE order_id IN (SELECT order_id FROM orders)
{% endif %}
GROUP BY order_id
),
customers AS (
SELECT
customer_id,
customer_segment,
value_tier
FROM {{ ref('dim_customers') }}
),
final AS (
SELECT
-- Grain: one record per order
o.order_id,
-- Foreign keys
o.customer_id,
-- Dates
o.order_date,
o.order_created_at,
o.order_updated_at,
-- Order measures
o.order_total,
o.tax_amount,
o.shipping_amount,
o.order_total - o.tax_amount - o.shipping_amount AS subtotal,
-- Item measures
oi.item_count,
oi.total_quantity,
oi.items_total,
oi.avg_unit_price,
-- Order attributes
o.order_status,
o.payment_method,
-- Customer context (slowly changing)
c.customer_segment,
c.value_tier,
-- Derived metrics
CASE
WHEN o.order_total > 200 THEN 'High'
WHEN o.order_total > 50 THEN 'Medium'
ELSE 'Low'
END AS order_size_category,
-- Metadata
CURRENT_TIMESTAMP() AS _updated_at
FROM orders o
LEFT JOIN order_items oi ON o.order_id = oi.order_id
LEFT JOIN customers c ON o.customer_id = c.customer_id
)
SELECT * FROM final
For complex scenarios, you might need custom incremental logic:
-- Custom incremental logic for handling deletes and updates
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
{% if is_incremental() %}
-- Handle hard deletes by removing records that no longer exist in source
DELETE FROM {{ this }}
WHERE order_id NOT IN (
SELECT order_id
FROM {{ ref('stg_ecommerce__orders') }}
WHERE order_updated_at >= (
SELECT MAX(order_updated_at)
FROM {{ this }}
)
)
AND order_updated_at >= (
SELECT MAX(order_updated_at)
FROM {{ this }}
);
{% endif %}
-- Main incremental logic continues...
dbt's testing framework goes far beyond simple not-null checks. You can implement sophisticated data quality rules that catch business logic errors and data inconsistencies.
Define tests directly in your schema YAML files:
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customers
description: "Customer dimension table with lifetime metrics and segmentation"
columns:
- name: customer_id
description: "Primary key for customers"
tests:
- unique
- not_null
- name: email
description: "Customer email address"
tests:
- not_null
- unique
- relationships:
to: ref('stg_ecommerce__customers')
field: email
- name: lifetime_value
description: "Total customer lifetime value"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 100000
inclusive: true
- name: customer_segment
description: "Business-defined customer segment"
tests:
- not_null
- accepted_values:
values: ['Never Purchased', 'New Customer', 'Regular',
'High Value', 'Champion', 'At Risk', 'Dormant']
- name: fct_orders
description: "Fact table for order transactions"
tests:
# Custom business rule tests
- assert_orders_have_items
- assert_order_totals_match_items
- assert_no_future_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_total
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 10000
Create reusable test logic with generic tests:
-- tests/generic/assert_column_sum_equals.sql
{% test assert_column_sum_equals(model, column_name, expected_sum, group_by=none) %}
{% set group_clause = "GROUP BY " + group_by if group_by else "" %}
{% set having_clause = "HAVING SUM(" + column_name + ") != " + expected_sum|string %}
SELECT
{% if group_by %}{{ group_by }},{% endif %}
SUM({{ column_name }}) as actual_sum,
{{ expected_sum }} as expected_sum
FROM {{ model }}
{% if group_by %}
{{ group_clause }}
{{ having_clause }}
{% else %}
HAVING SUM({{ column_name }}) != {{ expected_sum }}
{% endif %}
{% endtest %}
Some tests require custom SQL logic:
-- tests/assert_customer_segments_are_current.sql
-- Test that customer segments reflect current behavior
WITH recent_orders AS (
SELECT DISTINCT customer_id
FROM {{ ref('fct_orders') }}
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
),
customers_with_recent_orders AS (
SELECT c.*
FROM {{ ref('dim_customers') }} c
INNER JOIN recent_orders ro ON c.customer_id = ro.customer_id
),
misclassified_customers AS (
SELECT
customer_id,
customer_segment,
days_since_last_order
FROM customers_with_recent_orders
WHERE customer_segment IN ('Dormant', 'At Risk', 'Never Purchased')
AND days_since_last_order <= 30
)
SELECT * FROM misclassified_customers
Configure test severity levels and failure handling:
# dbt_project.yml
tests:
ecommerce_analytics:
staging:
+severity: error # Stop execution on staging test failures
marts:
core:
+severity: warn # Continue execution but report failures
+store_failures: true
+store_failures_as: table
Create custom test macros for complex validation:
-- macros/test_helpers.sql
{% macro generate_alert_on_test_failure(test_results) %}
{% if test_results %}
{% set failure_count = test_results | length %}
{% set message = "dbt test failure: " + failure_count|string + " tests failed" %}
-- Integration with monitoring systems
{{ log(message, info=True) }}
-- Custom webhook or Slack notification logic here
{{ send_slack_alert(message) }}
{% endif %}
{% endmacro %}
Snapshots capture how data changes over time, essential for slowly changing dimensions and compliance requirements.
-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
{{
config(
target_database='analytics',
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
)
}}
SELECT
customer_id,
email,
first_name,
last_name,
city,
state,
customer_status,
updated_at
FROM {{ source('ecommerce', 'customers') }}
WHERE customer_id IS NOT NULL
{% endsnapshot %}
For more complex scenarios, use the check strategy:
-- snapshots/product_pricing_snapshot.sql
{% snapshot product_pricing_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='product_id',
strategy='check',
check_cols=['price', 'cost', 'category', 'status'],
)
}}
SELECT *
FROM {{ source('inventory', 'products') }}
{% endsnapshot %}
Macros enable sophisticated code generation and reuse patterns that go far beyond simple substitution.
-- macros/generate_pivot_columns.sql
{% macro generate_pivot_columns(table_name, pivot_column, value_column, agg_func='sum') %}
{% set pivot_query %}
SELECT DISTINCT {{ pivot_column }}
FROM {{ table_name }}
ORDER BY {{ pivot_column }}
{% endset %}
{% set results = run_query(pivot_query) %}
{% if execute %}
{% set pivot_values = results.columns[0].values() %}
{% else %}
{% set pivot_values = [] %}
{% endif %}
{%- for value in pivot_values -%}
{{ agg_func }}(
CASE WHEN {{ pivot_column }} = '{{ value }}'
THEN {{ value_column }}
END
) AS {{ value | replace(' ', '_') | lower }}
{%- if not loop.last -%},{%- endif -%}
{%- endfor -%}
{% endmacro %}
Usage in a model:
-- models/marts/analytics/monthly_sales_by_category.sql
SELECT
DATE_TRUNC('month', order_date) AS month,
{{ generate_pivot_columns(
ref('fct_order_items'),
'product_category',
'line_total'
) }}
FROM {{ ref('fct_order_items') }}
GROUP BY 1
ORDER BY 1
Create macros that generate complex business logic:
-- macros/customer_segmentation.sql
{% macro generate_rfm_segments(
recency_field,
frequency_field,
monetary_field,
recency_thresholds=[30, 90, 365],
frequency_thresholds=[1, 3, 10],
monetary_thresholds=[100, 500, 2000]
) %}
CASE
-- Champions: Recent, frequent, high-value customers
WHEN {{ recency_field }} <= {{ recency_thresholds[0] }}
AND {{ frequency_field }} >= {{ frequency_thresholds[2] }}
AND {{ monetary_field }} >= {{ monetary_thresholds[2] }}
THEN 'Champion'
-- Loyal Customers: Frequent buyers regardless of recency
WHEN {{ frequency_field }} >= {{ frequency_thresholds[1] }}
AND {{ monetary_field }} >= {{ monetary_thresholds[1] }}
THEN 'Loyal'
-- Potential Loyalists: Recent customers with medium frequency
WHEN {{ recency_field }} <= {{ recency_thresholds[1] }}
AND {{ frequency_field }} >= {{ frequency_thresholds[1] }}
THEN 'Potential Loyalist'
-- New Customers: Recent but low frequency
WHEN {{ recency_field }} <= {{ recency_thresholds[0] }}
AND {{ frequency_field }} < {{ frequency_thresholds[1] }}
THEN 'New Customer'
-- At Risk: Used to be good customers but haven't purchased recently
WHEN {{ recency_field }} > {{ recency_thresholds[1] }}
AND {{ frequency_field }} >= {{ frequency_thresholds[1] }}
AND {{ monetary_field }} >= {{ monetary_thresholds[1] }}
THEN 'At Risk'
-- Cannot Lose: High-value customers who haven't purchased recently
WHEN {{ recency_field }} > {{ recency_thresholds[1] }}
AND {{ monetary_field }} >= {{ monetary_thresholds[2] }}
THEN 'Cannot Lose Them'
-- Hibernating: Low recent activity
WHEN {{ recency_field }} > {{ recency_thresholds[2] }}
THEN 'Hibernating'
-- Others
ELSE 'Others'
END
{% endmacro %}
dbt automatically generates comprehensive documentation and data lineage graphs. Enhance this with rich descriptions and metadata.
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customers
description: |
## Customer Dimension Table
This table provides a complete view of customers with calculated metrics
and business segments. Updated nightly via incremental refresh.
### Business Rules
- Customer segments are calculated based on RFM analysis
- Lifetime value includes all completed orders
- Value tiers are updated based on trailing 12-month activity
### Data Quality
- Primary key: customer_id (unique, not null)
- All monetary fields >= 0
- Segment classifications must be from approved list
columns:
- name: customer_id
description: "Unique identifier for customers from source system"
meta:
dimension:
type: "primary_key"
- name: customer_segment
description: |
Business segment based on RFM analysis:
- **Champion**: High recency, frequency, and monetary value
- **Loyal**: Consistent buyers with good monetary value
- **Potential Loyalist**: Recent customers showing promise
- **New Customer**: Recent first-time buyers
- **At Risk**: Previously good customers with declining activity
- **Cannot Lose Them**: High-value customers at risk of churning
- **Hibernating**: Inactive customers
meta:
dimension:
type: "categorical"
business_owner: "Marketing Team"
update_frequency: "daily"
Use the meta field to track data lineage and business context:
columns:
- name: lifetime_value
description: "Total customer lifetime value in USD"
meta:
business_definition: "Sum of all completed order totals for the customer"
calculation_logic: "SUM(fct_orders.order_total) WHERE order_status IN ('completed', 'shipped', 'delivered')"
data_classification: "confidential"
business_owner: "Finance Team"
technical_owner: "Data Engineering"
quality_checks:
- "Must be >= 0"
- "Should not exceed $100,000 (alert if violated)"
Professional dbt deployment requires sophisticated CI/CD pipelines that ensure code quality and prevent production issues.
# .github/workflows/dbt_ci.yml
name: dbt CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
DBT_PROFILES_DIR: ${{ github.workspace }}
DBT_PROJECT_DIR: ${{ github.workspace }}
jobs:
lint_and_test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install dbt-snowflake==1.6.0
pip install sqlfluff==2.3.0
dbt deps
- name: Lint SQL
run: |
sqlfluff lint models/ --dialect snowflake
- name: Create profiles.yml
run: |
mkdir -p ~/.dbt
cat > ~/.dbt/profiles.yml << EOF
ecommerce:
target: ci
outputs:
ci:
type: snowflake
account: ${{ secrets.SNOWFLAKE_ACCOUNT }}
user: ${{ secrets.SNOWFLAKE_USER }}
password: ${{ secrets.SNOWFLAKE_PASSWORD }}
role: CI_ROLE
database: CI_DATABASE
warehouse: CI_WAREHOUSE
schema: dbt_ci_${{ github.run_id }}
threads: 4
EOF
- name: Test connection
run: dbt debug
- name: Install dbt packages
run: dbt deps
- name: Run dbt tests on modified models only
if: github.event_name == 'pull_request'
run: |
# Get list of changed files
git diff --name-only origin/main...HEAD > changed_files.txt
# Extract model names from changed SQL files
grep "models/.*\.sql$" changed_files.txt | \
sed 's|models/||; s|\.sql$||; s|/|.|g' > changed_models.txt
# Run only changed models and their downstream dependencies
if [ -s changed_models.txt ]; then
while read model; do
dbt run --select $model+ --profiles-dir ~/.dbt
dbt test --select $model+ --profiles-dir ~/.dbt
done < changed_models.txt
fi
- name: Run full test suite on main branch
if: github.ref == 'refs/heads/main'
run: |
dbt run --profiles-dir ~/.dbt
dbt test --profiles-dir ~/.dbt
- name: Generate documentation
if: github.ref == 'refs/heads/main'
run: |
dbt docs generate --profiles-dir ~/.dbt
- name: Upload artifacts
if: always()
uses: actions/upload-artifact@v3
with:
name: dbt-artifacts
path: |
target/manifest.json
target/catalog.json
target/run_results.json
target/index.html
- name: Cleanup CI schema
if: always()
run: |
dbt run-operation drop_schema_if_exists \
--args "{schema_name: dbt_ci_${{ github.run_id }}}" \
--profiles-dir ~/.dbt
deploy_production:
runs-on: ubuntu-latest
needs: lint_and_test
if: github.ref == 'refs/heads/main'
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Deploy to Production
run: |
# Production deployment logic
dbt run --target prod --profiles-dir ~/.dbt
dbt test --target prod --profiles-dir ~/.dbt
# Update documentation site
dbt docs generate --target prod --profiles-dir ~/.dbt
aws s3 sync target/ s3://company-dbt-docs/ --delete
For zero-downtime deployments, implement blue-green deployments:
-- macros/blue_green_deploy.sql
{% macro blue_green_swap(schema_suffix='_blue') %}
{% set current_schema = target.schema %}
{% set blue_schema = current_schema + schema_suffix %}
{% set backup_schema = current_schema + '_backup' %}
{% if execute %}
{% set queries = [
"CREATE SCHEMA IF NOT EXISTS " + blue_schema,
"DROP SCHEMA IF EXISTS " + backup_schema + " CASCADE",
"ALTER SCHEMA " + current_schema + " RENAME TO " + backup_schema.split('.')[-1],
"ALTER SCHEMA " + blue_schema + " RENAME TO " + current_schema.split('.')[-1]
] %}
{% for query in queries %}
{% do run_query(query) %}
{{ log("Executed: " + query, info=True) }}
{% endfor %}
{{ log("Blue-green deployment completed successfully", info=True) }}
{% endif %}
{% endmacro %}
Let's build a complete customer analytics pipeline that demonstrates advanced dbt patterns. You'll create models that handle real business complexity including customer segmentation, cohort analysis, and revenue attribution.
dbt init customer_analytics
cd customer_analytics
dbt_project.yml:name: 'customer_analytics'
version: '1.0.0'
config-version: 2
profile: 'customer_analytics'
models:
customer_analytics:
staging:
+materialized: view
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+post-hook: "GRANT SELECT ON {{ this }} TO ROLE ANALYTICS_READER"
vars:
start_date: '2020-01-01'
cohort_periods: 12
rfm_recency_days: [30, 90, 365]
rfm_frequency_orders: [1, 5, 15]
rfm_monetary_thresholds: [100, 500, 2000]
Create your source definitions:
# models/staging/_sources.yml
version: 2
sources:
- name: ecommerce
database: raw
schema: ecommerce
tables:
- name: customers
columns:
- name: customer_id
tests: [unique, not_null]
- name: email
tests: [unique, not_null]
- name: orders
columns:
- name: order_id
tests: [unique, not_null]
- name: customer_id
tests: [not_null]
freshness:
warn_after: {count: 1, period: day}
error_after: {count: 3, period: day}
- name: order_items
columns:
- name: order_id
tests: [not_null]
- name: product_id
tests: [not_null]
Build your staging models with proper data contracts:
-- models/staging/stg_customers.sql
WITH source AS (
SELECT * FROM {{ source('ecommerce', 'customers') }}
),
cleaned AS (
SELECT
customer_id,
LOWER(TRIM(email)) AS email,
INITCAP(TRIM(first_name)) AS first_name,
INITCAP(TRIM(last_name)) AS last_name,
DATE(created_at) AS customer_created_date,
CURRENT_TIMESTAMP() AS _loaded_at
FROM source
WHERE customer_id IS NOT NULL
AND email IS NOT NULL
AND email LIKE '%@%.%' -- Basic email validation
)
SELECT * FROM cleaned
Create a sophisticated customer cohort analysis model:
-- models/intermediate/int_customer_cohorts.sql
WITH orders AS (
SELECT
customer_id,
order_date,
order_total,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY order_date
) AS order_sequence
FROM {{ ref('stg_orders') }}
WHERE order_status IN {{ var('completed_order_statuses', ['completed']) }}
),
customer_cohorts AS (
SELECT
customer_id,
MIN(order_date) AS cohort_month,
DATE_TRUNC('month', MIN(order_date)) AS cohort_month_start
FROM orders
GROUP BY customer_id
),
order_periods AS (
SELECT
o.customer_id,
o.order_date,
o.order_total,
cc.cohort_month_start,
DATEDIFF(
'month',
cc.cohort_month_start,
DATE_TRUNC('month', o.order_date)
) AS period_number
FROM orders o
INNER JOIN customer_cohorts cc ON o.customer_id = cc.customer_id
),
cohort_data AS (
SELECT
cohort_month_start,
period_number,
COUNT(DISTINCT customer_id) AS customers,
COUNT(*) AS orders,
SUM(order_total) AS revenue,
AVG(order_total) AS avg_order_value
FROM order_periods
WHERE period_number <= {{ var('cohort_periods', 12) }}
GROUP BY cohort_month_start, period_number
),
cohort_sizes AS (
SELECT
cohort_month_start,
COUNT(DISTINCT customer_id) AS cohort_size
FROM customer_cohorts
GROUP BY cohort_month_start
)
SELECT
cd.*,
cs.cohort_size,
cd.customers::FLOAT / cs.cohort_size AS retention_rate,
cd.revenue / cd.customers AS revenue_per_customer
FROM cohort_data cd
INNER JOIN cohort_sizes cs ON cd.cohort_month_start = cs.cohort_month_start
Build a comprehensive customer dimension with RFM segmentation:
-- models/marts/dim_customers_advanced.sql
{{
config(
materialized='table',
indexes=[
{'columns': ['customer_id'], 'unique': True},
{'columns': ['rfm_segment']},
{'columns': ['cohort_month']}
]
)
}}
WITH customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
rfm_metrics AS (
SELECT * FROM {{ ref('int_customer_rfm') }}
),
cohort_info AS (
SELECT
customer_id,
cohort_month_start AS cohort_month,
cohort_size
FROM {{ ref('int_customer_cohorts') }}
WHERE period_number = 0
),
final AS (
SELECT
c.customer_id,
c.email,
c.first_name,
c.last_name,
c.customer_created_date,
-- RFM Metrics
r.recency_days,
r.frequency_orders,
r.monetary_value,
r.recency_score,
r.frequency_score,
r.monetary_score,
r.rfm_segment,
-- Cohort Analysis
co.cohort_month,
co.cohort_size,
-- Advanced Segments
{{ generate_rfm_segments(
'r.recency_days',
'r.frequency_orders',
'r.monetary_value'
) }} AS business_segment,
CURRENT_TIMESTAMP() AS _updated_at
FROM customers c
LEFT JOIN rfm_metrics r ON c.customer_id = r.customer_id
LEFT JOIN cohort_info co ON c.customer_id = co.customer_id
)
SELECT * FROM final
Create a comprehensive test suite:
-- tests/assert_rfm_segments_valid.sql
-- Ensure RFM segmentation logic is working correctly
WITH rfm_validation AS (
SELECT
rfm_segment,
COUNT(*) as customer_count,
AVG(recency_days) as avg_recency,
AVG(frequency_orders) as avg_frequency,
AVG(monetary_value) as avg_monetary
FROM {{ ref('dim_customers_advanced') }}
WHERE rfm_segment IS NOT NULL
GROUP BY rfm_segment
),
invalid_segments AS (
SELECT *
FROM rfm_validation
WHERE
-- Champions should have low recency, high frequency, high monetary
(rfm_segment = 'Champion' AND (
avg_recency > 60 OR
avg_frequency < 3 OR
avg_monetary < 500
))
OR
-- At Risk should have high recency but good historical metrics
(rfm_segment = 'At Risk' AND (
avg_recency < 90 OR
avg_monetary < 200
))
)
SELECT * FROM invalid_segments
Problem: Cartesian joins in complex models
-- WRONG: This creates a cartesian join
SELECT
c.customer_id,
o.order_count,
i.item_count
FROM customers c
CROSS JOIN (SELECT COUNT(*) as order_count FROM orders) o
CROSS JOIN (SELECT COUNT(*) as item_count FROM order_items) i
Solution: Use proper grain and join conditions
-- CORRECT: Maintain proper grain
WITH customer_orders AS (
SELECT
customer_id,
COUNT(*) as order_count
FROM orders
GROUP BY customer_id
),
customer_items AS (
SELECT
o.customer_id,
COUNT(oi.item_id) as item_count
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
GROUP BY o.customer_id
)
SELECT
c.customer_id,
co.order_count,
ci.item_count
FROM customers c
LEFT JOIN customer_orders co ON c.customer_id = co.customer_id
LEFT JOIN customer_items ci ON c.customer_id = ci.customer_id
Problem: Incremental models without proper unique keys
-- WRONG: No unique key leads to duplicates
{{
config(
materialized='incremental'
)
}}
SELECT * FROM source_table
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Solution: Always specify unique keys and merge strategies
-- CORRECT: Proper unique key handling
{{
config(
materialized='incremental',
unique_key='order_id',
merge_update_columns=['status', 'total_amount', 'updated_at']
)
}}
Problem: Circular dependencies between models
Model A refs Model B
Model B refs Model C
Model C refs Model A -- Circular!
Solution: Restructure with intermediate models or use post-hooks for updates
Problem: Over-use of ephemeral models causing compilation issues
-- WRONG: Too many nested ephemeral models
{{ ref('ephemeral_a') }} -> {{ ref('ephemeral_b') }} -> {{ ref('ephemeral_c') }}
Solution: Materialize complex intermediate models as views or tables
Problem: Tests that don't fail when they should
# WRONG: This test will pass even with bad data
tests:
- accepted_values:
values: ['A', 'B', 'C', null] # null shouldn't be accepted
Solution: Be explicit about null handling and edge cases
You've now mastered the fundamental concepts that separate basic dbt usage from production-ready data engineering. You understand how dbt's compilation model transforms Jinja-templated SQL into optimized warehouse queries, how to structure projects for scale and maintainability, and how to implement sophisticated testing and deployment strategies.
The key insights to remember:
Your next steps should focus on:
Immediate actions:
Advanced techniques to explore:
The transformation from writing ad-hoc SQL queries to building production data platforms represents a fundamental shift in how we approach data work. With dbt as your foundation, you're equipped to build data infrastructure that scales with your organization's growing analytical needs.
Learning Path: Modern Data Stack