
You've built a solid dbt project. Models are clean, tests are green, and documentation looks great in the local browser. Then someone asks, "Great — when does this run in production?" And that's where the real work begins.
Running dbt in production isn't just about dropping a dbt run command into a cron job and calling it done. Real pipelines have upstream dependencies — raw data that needs to land before models can transform it, downstream consumers like dashboards and ML pipelines that need data by a certain time, and operational requirements like retry logic, alerting, and audit trails. Airflow is the tool most data teams reach for to manage all of that complexity. But stitching Airflow and dbt together effectively requires understanding how they each think about scheduling, dependencies, and failure modes — and how to make them work together without fighting each other.
By the end of this lesson, you'll have a production-grade pattern for orchestrating dbt with Airflow. You'll understand how to structure your DAGs to mirror your dbt project's dependency graph, how to handle partial failures gracefully, and how to build observability into the pipeline so when something breaks at 3am, your on-call engineer knows exactly what happened and where.
What you'll learn:
--select and --exclude flags to build composable, targeted DAG tasksThis lesson assumes you're comfortable with:
You don't need to be an Airflow expert, but you should know what a DAG file looks like and have run one before.
Before writing a single line of Python, you need to make an architectural choice that will shape everything else: should your Airflow DAG treat an entire dbt run as one atomic task, or should each dbt model (or group of models) be its own Airflow task?
This isn't a philosophical question — it has concrete operational consequences.
The monolithic approach runs dbt as a single BashOperator or DockerOperator task:
run_dbt = BashOperator(
task_id="run_dbt_project",
bash_command="dbt run --profiles-dir /opt/dbt --project-dir /opt/dbt/analytics",
dag=dag,
)
This is simple to set up and works fine when your dbt project is small and runs fast. The problem surfaces in production: if one model fails, you see a single red box in the Airflow UI with no information about which model failed, and to retry you re-run the entire project from scratch. You also lose the ability to express partial dependencies — for example, "start running the finance models as soon as the raw billing data lands, but the marketing models can wait for a separate upstream."
The task-per-model approach gives each model (or logical group of models) its own Airflow task, mirrors the dbt DAG inside your Airflow DAG, and lets you retry individual failed models. The tradeoff is complexity — a large dbt project could produce hundreds of tasks, which makes the Airflow UI noisy and can strain the scheduler.
The pragmatic middle ground — and what most production teams land on — is a task-per-layer or task-per-domain approach. Group your dbt models into logical stages (staging, intermediate, marts), or by business domain (finance_models, marketing_models), and give each group its own task. This keeps the Airflow DAG readable while preserving enough granularity to isolate failures.
[ExternalSensor: raw_events_landed]
|
[BashOperator: dbt_run_staging]
|
[BashOperator: dbt_run_intermediate]
/ \
/ \
[dbt_run_finance] [dbt_run_marketing]
\ /
\ /
[BashOperator: dbt_test_marts]
This is the pattern we'll build out through this lesson.
Let's ground this in a real scenario. You're a data engineer at a B2B SaaS company. Your dbt project transforms raw event data and CRM exports into clean mart tables that feed a Metabase dashboard for the sales team and a churn prediction model. Raw data lands in Snowflake via Fivetran connectors — one for your product events database, one for Salesforce.
Your Airflow environment runs on Airflow 2.x (we'll use the TaskFlow API where appropriate, but keep standard operators where clarity matters more than elegance).
First, install the dbt-related packages in your Airflow environment. Don't install dbt inside the Airflow core environment if you can avoid it — use a virtual environment or a dedicated Docker image to isolate dbt's heavy dependency tree from Airflow's:
# In your Airflow worker's virtual environment or Docker image
pip install dbt-snowflake==1.7.0
pip install apache-airflow==2.8.0
Your dbt project lives at /opt/dbt/analytics on the Airflow workers, with profiles stored at /opt/dbt/profiles. The profiles.yml reads credentials from environment variables:
# profiles.yml
analytics:
target: prod
outputs:
prod:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: TRANSFORMER
database: ANALYTICS
warehouse: TRANSFORMING_WH
schema: dbt_prod
threads: 8
Warning: Never hardcode credentials in profiles.yml or DAG files. Always pull from environment variables or a secrets backend like Airflow's Secrets Backend, AWS Secrets Manager, or HashiCorp Vault. Airflow has native support for most of these via its
SecretsBackendinterface.
Let's build the full DAG. We'll start with the upstream sensors, then build out the layered dbt tasks, and add error handling as we go.
Your pipeline shouldn't run until the raw data is actually there. Fivetran syncs run on their own schedule, and occasionally they're late. Instead of hardcoding a time buffer (the "just wait 30 minutes and hope" approach that every data team has tried and regretted), use Airflow sensors to check for actual data freshness.
The SnowflakeOperator combined with a custom SqlSensor is a clean pattern for this:
from airflow import DAG
from airflow.providers.common.sql.sensors.sql import SqlSensor
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import pendulum
default_args = {
"owner": "data-engineering",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
"on_failure_callback": notify_slack_on_failure, # defined below
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="analytics_dbt_pipeline",
default_args=default_args,
schedule_interval="0 6 * * *", # 6am UTC daily
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False,
tags=["dbt", "analytics", "production"],
doc_md="""
## Analytics dbt Pipeline
Runs the full analytics transformation pipeline after raw data
has landed from Fivetran (Snowflake events) and Salesforce CRM.
Produces mart tables consumed by Metabase and the churn model.
""",
) as dag:
# Sensor: wait for last night's events to land
wait_for_events = SqlSensor(
task_id="wait_for_product_events",
conn_id="snowflake_default",
sql="""
SELECT COUNT(*)
FROM raw.fivetran_events.page_views
WHERE DATE(event_timestamp) = '{{ ds }}'
AND event_timestamp >= '{{ ds }} 23:45:00'
""",
mode="reschedule", # release the worker slot while waiting
poke_interval=300, # check every 5 minutes
timeout=7200, # fail after 2 hours
soft_fail=False,
)
# Sensor: wait for Salesforce accounts sync
wait_for_crm = SqlSensor(
task_id="wait_for_salesforce_crm",
conn_id="snowflake_default",
sql="""
SELECT COUNT(*)
FROM raw.fivetran_salesforce.account
WHERE _fivetran_synced >= '{{ ds }} 04:00:00'
""",
mode="reschedule",
poke_interval=300,
timeout=7200,
soft_fail=False,
)
Notice mode="reschedule" on the sensors. This is critical in production. The default mode="poke" holds a worker slot for the entire wait duration — if your sensor waits 2 hours and you have 10 concurrent DAG runs or other pipelines, you'll exhaust your worker pool. reschedule mode releases the slot between checks and is almost always the right choice for sensors with long wait windows.
Also notice soft_fail=False. When the raw data doesn't land within 2 hours, you want this to be a hard failure that pages the on-call engineer, not a soft skip that lets downstream tasks run on stale data.
Now the dbt tasks. We'll use BashOperator throughout — it's explicit, debuggable, and doesn't require you to maintain a separate dbt Cloud connection or custom operator. When something breaks, you can reproduce the exact command locally.
# --- STAGING LAYER ---
# Runs all models tagged with 'staging'
dbt_run_staging = BashOperator(
task_id="dbt_run_staging",
bash_command="""
dbt run \
--profiles-dir /opt/dbt \
--project-dir /opt/dbt/analytics \
--select tag:staging \
--vars '{"run_date": "{{ ds }}"}'
""",
)
dbt_test_staging = BashOperator(
task_id="dbt_test_staging",
bash_command="""
dbt test \
--profiles-dir /opt/dbt \
--project-dir /opt/dbt/analytics \
--select tag:staging \
--vars '{"run_date": "{{ ds }}"}'
""",
)
# --- INTERMEDIATE LAYER ---
dbt_run_intermediate = BashOperator(
task_id="dbt_run_intermediate",
bash_command="""
dbt run \
--profiles-dir /opt/dbt \
--project-dir /opt/dbt/analytics \
--select tag:intermediate \
--vars '{"run_date": "{{ ds }}"}'
""",
)
# --- MART LAYER: Finance and Marketing run in parallel ---
dbt_run_finance_marts = BashOperator(
task_id="dbt_run_finance_marts",
bash_command="""
dbt run \
--profiles-dir /opt/dbt \
--project-dir /opt/dbt/analytics \
--select tag:finance_mart \
--vars '{"run_date": "{{ ds }}"}'
""",
)
dbt_run_marketing_marts = BashOperator(
task_id="dbt_run_marketing_marts",
bash_command="""
dbt run \
--profiles-dir /opt/dbt \
--project-dir /opt/dbt/analytics \
--select tag:marketing_mart \
--vars '{"run_date": "{{ ds }}"}'
""",
)
# --- FINAL TESTS: run after all marts ---
dbt_test_marts = BashOperator(
task_id="dbt_test_marts",
bash_command="""
dbt test \
--profiles-dir /opt/dbt \
--project-dir /opt/dbt/analytics \
--select tag:finance_mart tag:marketing_mart \
--vars '{"run_date": "{{ ds }}"}'
""",
)
# --- WIRE DEPENDENCIES ---
[wait_for_events, wait_for_crm] >> dbt_run_staging
dbt_run_staging >> dbt_test_staging
dbt_test_staging >> dbt_run_intermediate
dbt_run_intermediate >> [dbt_run_finance_marts, dbt_run_marketing_marts]
[dbt_run_finance_marts, dbt_run_marketing_marts] >> dbt_test_marts
For this to work, your dbt models need the corresponding tags in their dbt_project.yml or in model-level configs:
# dbt_project.yml
models:
analytics:
staging:
+tags: ["staging"]
intermediate:
+tags: ["intermediate"]
marts:
finance:
+tags: ["finance_mart"]
marketing:
+tags: ["marketing_mart"]
Tip: The
--selectflag is one of dbt's most powerful features. Beyond tags, you can select by model name (--select stg_events), by path (--select models/staging/), by the model and all its dependencies (--select +fct_revenue), or by the model and all its dependents (--select fct_revenue+). Learn the graph selector syntax — it unlocks composable, surgical pipeline tasks.
The DAG above handles the happy path. Let's talk about what happens when things break.
dbt returns different exit codes depending on what went wrong:
| Exit Code | Meaning |
|---|---|
| 0 | Success |
| 1 | Runtime error (model compilation failed, SQL error) |
| 2 | dbt internal error (should be rare) |
| 3 | Partial success (some models ran, some failed — only with --no-partial-parse edge cases) |
BashOperator considers any non-zero exit code a task failure, so dbt's model failures will correctly show as Airflow task failures. However, by default, if dbt test finds test failures, it also exits with code 1 — this means test failures and model run failures look identical from Airflow's perspective. We'll address this in the observability section.
The default_args we set earlier include:
"retries": 2,
"retry_delay": timedelta(minutes=5),
"retry_exponential_backoff": True,
Exponential backoff is especially important for tasks that hit transient warehouse errors (Snowflake credit exhaustion, query timeouts during peak load). With retry_exponential_backoff=True, the first retry waits 5 minutes, the second waits 10. For dbt tasks that depend on fresh data from upstream sensors, this prevents hammering a warehouse that's already under load.
However, don't apply blanket retry settings to all tasks. Sensors should have generous retries (they're designed to wait). Model run tasks should have 2-3 retries for transient errors. Test tasks should arguably have fewer retries — a failing dbt test is usually a data quality issue, not a transient error, and retrying it three times just delays your alert by 15 minutes.
dbt_test_marts = BashOperator(
task_id="dbt_test_marts",
bash_command="...",
retries=1, # fewer retries — test failures are usually real
retry_delay=timedelta(minutes=2),
execution_timeout=timedelta(minutes=30),
)
Knowing something failed is only half the battle — the other half is getting useful context immediately. Here's a production-ready Slack callback:
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.hooks.base import BaseHook
def notify_slack_on_failure(context):
"""
Sends a detailed Slack alert when a task fails.
context dict includes task_instance, dag, exception, etc.
"""
task_instance = context.get("task_instance")
dag_id = context.get("dag").dag_id
task_id = task_instance.task_id
execution_date = context.get("execution_date")
log_url = task_instance.log_url
exception = context.get("exception")
message = f"""
:red_circle: *dbt Pipeline Failure*
*DAG:* `{dag_id}`
*Task:* `{task_id}`
*Execution Date:* `{execution_date}`
*Exception:* `{str(exception)[:500]}`
<{log_url}|View Logs>
""".strip()
slack_hook = SlackWebhookOperator(
task_id="slack_notify",
slack_webhook_conn_id="slack_data_alerts",
message=message,
channel="#data-alerts",
)
slack_hook.execute(context=context)
Wire this into default_args as "on_failure_callback": notify_slack_on_failure and every failed task will post to your #data-alerts Slack channel with a direct link to the logs. The log_url field is the most valuable part — your on-call engineer can jump straight to the error without navigating through the Airflow UI.
Warning: The exception message in
context.get("exception")for a BashOperator failure often just says "Bash command failed." The actual dbt error is in the task logs, not the exception object. This is why the log URL link is non-negotiable — make sure your Airflow instance is accessible to your team (or logs are shipped to a centralized system like CloudWatch or Datadog).
In larger projects, a single model failure will block all downstream models in the same dbt run command. If stg_page_views fails, every downstream model that refs it also fails. This is actually correct behavior — you don't want to run fct_user_sessions on incomplete staging data.
But there's a subtler issue: if you've structured your DAG so that dbt_run_intermediate runs as one task with 40 models, and model #35 fails, you've got 34 successfully built models and 5 failed ones — and Airflow shows the whole task as failed. On retry, dbt will re-run all 40 models, including the 34 that succeeded.
For large intermediate layers, use dbt's --defer flag combined with a state comparison to only re-run what actually failed:
dbt_run_intermediate = BashOperator(
task_id="dbt_run_intermediate",
bash_command="""
dbt run \
--profiles-dir /opt/dbt \
--project-dir /opt/dbt/analytics \
--select tag:intermediate \
--defer \
--state /opt/dbt/state/prod \
--vars '{"run_date": "{{ ds }}"}'
""",
)
The --state directory should contain the manifest.json from the last successful production run. Many teams store this artifact in S3 and download it as part of the DAG setup, or use dbt Cloud's artifact storage if they're on that platform.
For teams not yet using --defer, a more accessible approach is to add result_state="failed" awareness via a separate dbt command:
# Only re-run models that failed in the previous run
dbt run \
--select result:error result:fail \
--state /opt/dbt/state/last_run
This result:error selector reads from run_results.json in the --state directory and reruns only the models that failed — a much faster recovery path on a partial failure.
After each dbt run, dbt writes structured JSON artifacts to the target/ directory:
manifest.json — the compiled project graph (models, tests, sources, relationships)run_results.json — timing and status for every node that rancatalog.json — schema information (written by dbt docs generate)sources.json — source freshness resultsThese are gold for building pipeline observability. Let's use them.
Add a task after dbt_test_marts that parses the artifacts and sends a structured summary to Slack:
from airflow.decorators import task
import json
import os
@task
def parse_dbt_run_results(**context):
"""
Read dbt run_results.json and push a summary to XCom.
Run this after dbt completes to capture detailed model status.
"""
run_results_path = "/opt/dbt/analytics/target/run_results.json"
if not os.path.exists(run_results_path):
raise FileNotFoundError(
f"run_results.json not found at {run_results_path}. "
"Did the dbt run complete successfully enough to write artifacts?"
)
with open(run_results_path, "r") as f:
run_results = json.load(f)
results = run_results.get("results", [])
summary = {
"total": len(results),
"success": sum(1 for r in results if r["status"] == "success"),
"error": sum(1 for r in results if r["status"] == "error"),
"skipped": sum(1 for r in results if r["status"] == "skipped"),
"failed_models": [
r["unique_id"]
for r in results
if r["status"] in ("error", "fail")
],
"slowest_models": sorted(
[(r["unique_id"], r.get("execution_time", 0)) for r in results],
key=lambda x: x[1],
reverse=True,
)[:5],
}
return summary
@task
def post_pipeline_summary(run_summary: dict, **context):
"""Post a rich pipeline summary to Slack after every run."""
failed = run_summary.get("failed_models", [])
slowest = run_summary.get("slowest_models", [])
status_emoji = ":white_check_mark:" if not failed else ":warning:"
slow_text = "\n".join(
[f" • `{m}`: {t:.1f}s" for m, t in slowest]
)
message = f"""
{status_emoji} *dbt Pipeline Complete*
*Models:* {run_summary['success']} succeeded, {run_summary['error']} failed, {run_summary['skipped']} skipped
*Slowest Models:*
{slow_text}
""".strip()
if failed:
message += f"\n*Failed:* `{'`, `'.join(failed)}`"
# Post to Slack using Airflow's hook
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id="slack_data_alerts")
hook.send(text=message, channel="#data-engineering")
Wire this into the DAG using the TaskFlow API:
run_summary = parse_dbt_run_results()
pipeline_summary = post_pipeline_summary(run_summary)
dbt_test_marts >> run_summary >> pipeline_summary
This gives your team a Slack message after every run showing exactly which models ran, which were slow, and which failed — without having to open Airflow.
To enable --defer and result:error selectors, you need to preserve artifacts from the previous successful run. Add a task that copies the current artifacts to a "last successful run" location:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@task
def archive_dbt_artifacts(**context):
"""
Upload dbt artifacts to S3 for use as --state in future runs.
Only runs if upstream tasks succeeded.
"""
s3_hook = S3Hook(aws_conn_id="aws_default")
execution_date = context["ds"]
project_dir = "/opt/dbt/analytics/target"
artifacts = ["manifest.json", "run_results.json", "catalog.json"]
for artifact in artifacts:
local_path = f"{project_dir}/{artifact}"
if os.path.exists(local_path):
# Archive with date for historical analysis
s3_hook.load_file(
filename=local_path,
key=f"dbt-artifacts/{execution_date}/{artifact}",
bucket_name="my-data-platform-artifacts",
replace=True,
)
# Also write to 'latest' location for --state reference
s3_hook.load_file(
filename=local_path,
key=f"dbt-artifacts/latest/{artifact}",
bucket_name="my-data-platform-artifacts",
replace=True,
)
Add archive_dbt_artifacts as the final task in your DAG, after the summary. Now every successful run updates the latest/ prefix in S3, giving subsequent runs a valid --state to reference.
For teams with large dbt projects, you can generate Airflow tasks programmatically from manifest.json. This creates a true one-to-one mapping between dbt nodes and Airflow tasks, giving you per-model retry and observability in Airflow.
Here's a simplified version of this pattern:
import json
from pathlib import Path
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
MANIFEST_PATH = Path("/opt/dbt/analytics/target/manifest.json")
DBT_PROJECT_DIR = "/opt/dbt/analytics"
DBT_PROFILES_DIR = "/opt/dbt"
def load_manifest():
with open(MANIFEST_PATH) as f:
return json.load(f)
def create_dbt_task(dag, node_name, node_info):
"""Create a BashOperator for a single dbt model or test."""
resource_type = node_info["resource_type"]
if resource_type == "model":
command = "run"
elif resource_type == "test":
command = "test"
else:
return None
model_name = node_info["name"]
safe_task_id = node_name.replace(".", "__")
return BashOperator(
task_id=safe_task_id,
bash_command=f"""
dbt {command} \
--profiles-dir {DBT_PROFILES_DIR} \
--project-dir {DBT_PROJECT_DIR} \
--select {model_name}
""",
dag=dag,
)
manifest = load_manifest()
with DAG(
dag_id="dbt_model_per_task",
schedule_interval="0 6 * * *",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False,
) as dag:
tasks = {}
# Create a task for each model
for node_name, node_info in manifest["nodes"].items():
if node_info["resource_type"] in ("model", "test"):
task = create_dbt_task(dag, node_name, node_info)
if task:
tasks[node_name] = task
# Wire dependencies from the manifest graph
for node_name, node_info in manifest["nodes"].items():
if node_name not in tasks:
continue
for upstream_node in node_info.get("depends_on", {}).get("nodes", []):
if upstream_node in tasks:
tasks[upstream_node] >> tasks[node_name]
Warning: This dynamic pattern has real costs. A dbt project with 200 models generates 200+ Airflow tasks per DAG run, which strains the Airflow scheduler, creates a nearly unreadable UI graph, and means your manifest.json must exist and be valid before the DAG file is even parsed. Use this pattern only if you have genuine per-model retry requirements. Most teams are better served by the layered approach.
Build a complete Airflow DAG for a three-domain dbt pipeline using the patterns from this lesson. Your pipeline should:
Setup:
staging, core, and reporting.dbt-duckdb adapter with a local DuckDB database — it works perfectly for this exercise.DAG Requirements:
SqlSensor that waits for at least 100 rows in a raw source table dated to the execution date. Use mode="reschedule" and a 5-minute poke interval.staging → core → reporting, with dbt test running after each layer.on_failure_callback that prints a structured failure message to the Airflow logs (since you may not have Slack in a local environment, use Python's logging module and print to simulate the alert).parse_dbt_run_results task using the TaskFlow API that reads the run_results.json and prints: total models run, count of successes, count of errors, and the name of the slowest model.dbt test tasks have retries=1 and your dbt run tasks have retries=2 with retry_exponential_backoff=True.Stretch goal: Modify the dbt_run_staging task to use --full-refresh only on Sundays (use Airflow's execution_date.day_of_week in a Jinja template or Python callable).
This usually means the bash_command exited with code 0 despite dbt doing nothing. Check for two things: First, does the --select filter match any models? Run the same dbt command locally to verify the selector resolves to actual models. Second, are you using the correct --profiles-dir and --project-dir? dbt silently falls back to defaults if it can't find the profile, and may connect to the wrong target.
You forgot mode="reschedule". Change it. Also check that your sensor's SQL query actually returns a non-zero value when the data is present — test the SQL directly in your warehouse.
You're probably loading the dbt manifest at module parse time in a dynamic DAG. Airflow re-parses DAG files frequently. Move expensive file I/O inside function scopes or use @cached_property patterns. Or switch to the layered approach which doesn't require manifest parsing at all.
This shouldn't happen — dbt test exits with code 1 on test failures. But if you're using dbt test --store-failures and have a warehouse permissions issue with the test results schema, dbt may exit 0 despite not running all tests. Check the task logs for Completed with 1 warning messages.
The most common reason: your Airflow worker uses a different version of dbt or different environment variables than your local machine. Always run the exact bash_command from the failing task in a shell on the worker machine (or worker Docker container) to reproduce it faithfully. For Kubernetes executor environments, kubectl exec into the pod before it terminates.
This is the partial failure problem. Implement the result:error selector pattern described in the error handling section. Store run_results.json from the failed run and use --select result:error --state ./last_run_artifacts on retry.
Callbacks that fail silently are worse than no callback — you'll miss alerts precisely when you need them. Wrap your callback logic in a try/except and log the exception if the notification itself fails:
def notify_slack_on_failure(context):
try:
# ... your Slack notification code ...
except Exception as e:
import logging
logging.error(f"Failed to send Slack alert: {e}")
# Don't re-raise — let the original task failure propagate cleanly
You now have a production-grade pattern for orchestrating dbt with Airflow. Let's recap the key decisions and their reasoning:
Architectural choices:
SqlSensor with mode="reschedule" for upstream data dependencies — never hardcode time buffersError handling:
retry_exponential_backoff=True for transient warehouse errorsresult:error selectors with --state to avoid re-running successful models on partial failure retriesObservability:
run_results.json after each run for detailed model-level status--defer and historical analysisWhere to go next:
DbtCloudRunJobOperator from apache-airflow-providers-dbt-cloud. It handles run monitoring, artifact retrieval, and cloud-based scheduling natively.manifest.json into a lineage tool like OpenLineage/Marquez or DataHub to get full end-to-end visibility from raw source to dashboard.The patterns in this lesson are production-tested, but no orchestration system is set-and-forget. As your dbt project grows, revisit your task groupings, your sensor timeouts, and your artifact archival strategy. A pipeline that works well at 50 models often needs restructuring at 200.
Learning Path: Modern Data Stack