Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Secrets Management and Credential Rotation for Data Pipelines in Production

Secrets Management and Credential Rotation for Data Pipelines in Production

Data Engineering⚡ Practitioner25 min readJun 27, 2026Updated Jun 27, 2026
Table of Contents
  • Introduction
  • Prerequisites
  • Why Hardcoding Credentials Is a Production Liability, Not Just a Security Risk
  • Choosing Your Secrets Backend
  • AWS Secrets Manager
  • HashiCorp Vault
  • GCP Secret Manager
  • The Anatomy of a Secret in Production
  • Building a Production-Grade Secret Client
  • Integrating Secrets with Airflow
  • Configuring the Secrets Backend
  • Using Secrets in a Production DAG
  • Wiring Secrets into dbt

Secrets Management and Credential Rotation for Data Pipelines in Production

Introduction

It's 2:47 AM and your on-call phone rings. The data pipeline that feeds your company's revenue dashboard has been down for six hours. You dig into the logs and find it: a database password that expired at midnight, hardcoded in a config file that nobody remembered to update. The pipeline was silently failing, writing nothing, while leadership prepared for a board meeting that starts in four hours. You've just learned, at significant personal cost, why secrets management is a first-class engineering concern — not an afterthought.

This scenario plays out constantly in data engineering teams of every size. Credentials get committed to Git repositories, stored in plaintext environment variables, shared over Slack, or rotated manually on a calendar reminder that someone inevitably misses. The result is a class of production incidents that are entirely preventable but remain stubbornly common, because the tooling and patterns for doing this correctly aren't always obvious when you're moving fast.

By the end of this lesson, you will have a production-grade approach to managing credentials in data pipelines — from the initial storage of a secret through automated rotation, to integration with orchestration systems like Airflow and dbt. You'll build real code that reads secrets dynamically at runtime, handles rotation gracefully without downtime, and alerts you when something goes wrong before it becomes an incident.

What you'll learn:

  • How to evaluate and choose a secrets management backend (AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager) for your specific pipeline architecture
  • The mechanics of dynamic credential injection so pipelines never read credentials from flat files or static environment variables
  • How to implement zero-downtime credential rotation using versioned secrets and dual-credential patterns
  • How to wire secrets management into Airflow, dbt, and Python-based pipelines with complete, working code
  • How to detect and respond to rotation failures before they cause outages

Prerequisites

You should be comfortable with:

  • Python at an intermediate level (classes, context managers, decorators)
  • Basic AWS or GCP concepts (IAM roles, service accounts, APIs)
  • The structure of a data pipeline — you understand what an Airflow DAG is, what dbt does, and what it means to run a pipeline in production
  • Basic understanding of environment variables and how they're used in application configuration

Why Hardcoding Credentials Is a Production Liability, Not Just a Security Risk

Before we get into tooling, let's be precise about what we're actually protecting against. The security concerns are well-documented elsewhere, so we'll focus on the operational failures that secrets mismanagement causes in data pipelines specifically.

Rotation friction causes outages. When credentials live in config files or environment variables, rotating a password means coordinating a deployment. Someone has to update the secret value, redeploy the service, and verify everything works — ideally without downtime. In practice, teams avoid this friction, so passwords stay active far beyond their intended lifespan. When a security team eventually forces rotation, the pipeline breaks.

Sprawl makes incident response painful. A single database password might exist in your Airflow connection config, a dbt profiles.yml, three Glue job scripts, a Lambda function, and a local .env file on two developers' laptops. When that password needs to rotate, you're doing archaeology. The more copies a secret has, the longer your blast radius.

Static secrets can't be audited meaningfully. When every pipeline run uses the same credential, you can't answer "which pipeline accessed the warehouse at 3 AM and ran that expensive query?" because they all look identical. Dynamic, short-lived credentials solve this — each pipeline run gets a unique credential, and your audit log becomes meaningful.

The goal of modern secrets management is not just security — it's operational agility: the ability to rotate credentials at any time without downtime, understand who accessed what, and reduce the operational cost of credential lifecycle management.


Choosing Your Secrets Backend

You have three realistic choices for production data pipelines, and the right answer depends on where your infrastructure lives.

AWS Secrets Manager

If your pipelines run on AWS (ECS, EKS, Lambda, Glue, or even EC2), AWS Secrets Manager is the path of least resistance. It integrates natively with IAM, supports automatic rotation for RDS databases, and has a straightforward API.

Strengths:

  • Native RDS rotation built in (no code required)
  • IAM-based access control with fine-grained resource policies
  • Cross-account access via IAM roles
  • SDK available in every major language

Weaknesses:

  • $0.40/secret/month plus $0.05/10,000 API calls — not negligible at scale
  • Vendor lock-in
  • No dynamic secrets (you store static credentials; it doesn't generate credentials on demand)

HashiCorp Vault

Vault is the gold standard for multi-cloud or on-premises environments. Its killer feature is dynamic secrets: Vault connects to your database and generates a unique username and password for each pipeline run, with a configurable TTL. After the TTL expires, the credential is automatically revoked.

Strengths:

  • Dynamic secrets for PostgreSQL, MySQL, Snowflake, MongoDB, and more
  • Provider-agnostic (runs anywhere)
  • Extremely flexible access policies
  • Open source core with enterprise options

Weaknesses:

  • Operational overhead — you're running a distributed system that needs to be highly available
  • Steeper learning curve
  • Requires your own infrastructure

GCP Secret Manager

The GCP equivalent of AWS Secrets Manager. If you're on BigQuery, Dataflow, or Cloud Composer, this is the natural choice.

Strengths:

  • Deep integration with GCP IAM and service accounts
  • Simple, clean API
  • Supports versioning natively

Weaknesses:

  • No native rotation automation (you implement it yourself or use Cloud Functions)
  • Fewer database-native integrations than Vault

For the rest of this lesson, we'll use AWS Secrets Manager for static-credential examples and Vault dynamic secrets for the rotation patterns, since these represent the two most common real-world scenarios. The patterns translate directly to GCP Secret Manager and other backends.


The Anatomy of a Secret in Production

Before writing code, let's establish what a well-structured secret looks like. A common mistake is storing a single password string as the secret value. Instead, store a JSON object with all the connection details:

{
  "engine": "postgresql",
  "host": "analytics-db.cluster-abc123.us-east-1.rds.amazonaws.com",
  "port": 5432,
  "dbname": "analytics",
  "username": "pipeline_user",
  "password": "s3cr3t-value-here",
  "ssl_mode": "require"
}

Storing the full connection spec in one secret gives you several advantages: you can update the host or port alongside the password during migrations without touching pipeline code, and your secret-fetching code has a clean, stable interface regardless of what changes underneath.

In AWS Secrets Manager, create this secret via the CLI:

aws secretsmanager create-secret \
  --name "prod/analytics-pipeline/warehouse-db" \
  --description "PostgreSQL credentials for the analytics data warehouse" \
  --secret-string '{
    "engine": "postgresql",
    "host": "analytics-db.cluster-abc123.us-east-1.rds.amazonaws.com",
    "port": 5432,
    "dbname": "analytics",
    "username": "pipeline_user",
    "password": "s3cr3t-value-here",
    "ssl_mode": "require"
  }' \
  --region us-east-1

Name your secrets with a consistent hierarchy: {environment}/{service}/{resource}. This isn't just organizational tidiness — it lets you write IAM policies that grant access to all secrets in prod/analytics-pipeline/* without listing individual secret ARNs.


Building a Production-Grade Secret Client

Rather than calling boto3 inline everywhere a pipeline needs a credential, build a dedicated secret client module. This gives you a single place to add caching, error handling, and fallback logic.

# secrets_client.py
import json
import logging
import time
from functools import lru_cache
from typing import Any, Optional

import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


class SecretsManagerClient:
    """
    A caching wrapper around AWS Secrets Manager that handles
    version-aware retrieval and provides connection-dict output
    suitable for direct use with SQLAlchemy or psycopg2.
    """

    def __init__(
        self,
        region_name: str = "us-east-1",
        cache_ttl_seconds: int = 300,
    ):
        self._client = boto3.client("secretsmanager", region_name=region_name)
        self._cache: dict[str, tuple[Any, float]] = {}
        self._cache_ttl = cache_ttl_seconds

    def get_secret(
        self,
        secret_name: str,
        version_stage: str = "AWSCURRENT",
        force_refresh: bool = False,
    ) -> dict:
        """
        Retrieve a secret by name. Results are cached for cache_ttl_seconds
        to avoid hammering the Secrets Manager API on every pipeline task.

        During credential rotation, pass version_stage="AWSPENDING" to
        retrieve the new credential before it becomes AWSCURRENT.
        """
        cache_key = f"{secret_name}:{version_stage}"

        if not force_refresh and cache_key in self._cache:
            cached_value, cached_at = self._cache[cache_key]
            if time.monotonic() - cached_at < self._cache_ttl:
                logger.debug("Returning cached secret for %s", secret_name)
                return cached_value

        try:
            logger.info("Fetching secret %s (stage=%s)", secret_name, version_stage)
            response = self._client.get_secret_value(
                SecretId=secret_name,
                VersionStage=version_stage,
            )
        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            if error_code == "ResourceNotFoundException":
                raise ValueError(f"Secret '{secret_name}' not found") from e
            elif error_code == "AccessDeniedException":
                raise PermissionError(
                    f"IAM role lacks access to secret '{secret_name}'"
                ) from e
            else:
                # For transient errors, log and re-raise so the caller
                # can implement retry logic at the orchestration layer
                logger.error(
                    "Unexpected error fetching secret %s: %s",
                    secret_name,
                    error_code,
                )
                raise

        secret_value = json.loads(response["SecretString"])
        self._cache[cache_key] = (secret_value, time.monotonic())
        return secret_value

    def get_connection_url(self, secret_name: str) -> str:
        """
        Returns a SQLAlchemy-compatible connection URL built from secret fields.
        Useful for direct use with pandas.read_sql or SQLAlchemy create_engine.
        """
        s = self.get_secret(secret_name)
        return (
            f"{s['engine']}+psycopg2://{s['username']}:{s['password']}"
            f"@{s['host']}:{s['port']}/{s['dbname']}"
            f"?sslmode={s.get('ssl_mode', 'require')}"
        )

    def invalidate_cache(self, secret_name: str) -> None:
        """Call this when you know a rotation has occurred."""
        keys_to_remove = [k for k in self._cache if k.startswith(secret_name)]
        for key in keys_to_remove:
            del self._cache[key]
        logger.info("Invalidated cache for %s", secret_name)


# Module-level singleton — import this in your pipeline code
secrets = SecretsManagerClient()

A few design decisions worth explaining:

The TTL cache matters more than you think. In a complex Airflow DAG with 50 tasks, each task that needs a database connection will call get_secret. Without caching, that's 50 API calls per DAG run. AWS Secrets Manager charges per API call, and more importantly, it has rate limits. A 5-minute TTL is usually safe — short enough that a rotation is picked up within one DAG run interval, long enough to avoid rate limit issues.

force_refresh is your escape hatch. When your rotation Lambda explicitly tells your pipeline "I just rotated, invalidate your cache," the force_refresh=True parameter ensures the pipeline picks up the new credential immediately rather than serving stale data for up to 5 minutes.

Distinguish error types explicitly. ResourceNotFoundException and AccessDeniedException are configuration errors — retrying won't fix them, and they need different remediation. Raising them as ValueError and PermissionError respectively lets your orchestration layer handle them differently from transient network failures.


Integrating Secrets with Airflow

Airflow has its own secrets backend system, and wiring it to AWS Secrets Manager means your connections and variables are fetched dynamically at task execution time — no Airflow metadata database, no plaintext in your DAG code.

Configuring the Secrets Backend

In your airflow.cfg or via environment variables:

[secrets]
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {
    "connections_prefix": "airflow/connections",
    "variables_prefix": "airflow/variables",
    "region_name": "us-east-1"
}

With this configuration, when your DAG references conn_id="warehouse_postgres", Airflow looks up airflow/connections/warehouse_postgres in Secrets Manager and constructs the connection object dynamically. The secret format needs to match Airflow's expected schema:

{
  "conn_type": "postgres",
  "host": "analytics-db.cluster-abc123.us-east-1.rds.amazonaws.com",
  "schema": "analytics",
  "login": "pipeline_user",
  "password": "s3cr3t-value-here",
  "port": 5432,
  "extra": "{\"sslmode\": \"require\"}"
}

Using Secrets in a Production DAG

Here's a realistic DAG that loads Stripe payment data into a PostgreSQL warehouse. Notice how credentials are never touched directly in the DAG code:

# dags/stripe_to_warehouse.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task
import logging

logger = logging.getLogger(__name__)

default_args = {
    "owner": "data-engineering",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "retry_exponential_backoff": True,
}

with DAG(
    dag_id="stripe_payments_to_warehouse",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@hourly",
    default_args=default_args,
    catchup=False,
    tags=["payments", "stripe", "ingestion"],
) as dag:

    @task()
    def extract_stripe_events(execution_date=None):
        """
        Fetch Stripe payment events. The Stripe API key is stored in
        Secrets Manager at prod/stripe/api-key and fetched here at
        task runtime, not at DAG parse time.
        """
        from secrets_client import secrets
        import stripe

        stripe_config = secrets.get_secret("prod/stripe/api-key")
        stripe.api_key = stripe_config["api_key"]

        start_ts = int(execution_date.timestamp())
        end_ts = int((execution_date + timedelta(hours=1)).timestamp())

        events = stripe.Event.list(
            type="payment_intent.succeeded",
            created={"gte": start_ts, "lt": end_ts},
            limit=100,
        )

        return [e.to_dict() for e in events.auto_paging_iter()]

    @task()
    def load_events_to_staging(events: list):
        """
        Load extracted events into the staging table.
        Connection 'warehouse_postgres' is resolved by Airflow's
        SecretsManagerBackend — no credentials in this code.
        """
        if not events:
            logger.info("No events to load for this execution window")
            return 0

        hook = PostgresHook(postgres_conn_id="warehouse_postgres")

        records = [
            (
                event["id"],
                event["data"]["object"]["amount"],
                event["data"]["object"]["currency"],
                event["created"],
                event["data"]["object"].get("customer"),
            )
            for event in events
        ]

        hook.insert_rows(
            table="staging.stripe_payment_events",
            rows=records,
            target_fields=["event_id", "amount_cents", "currency", "event_ts", "customer_id"],
            replace=True,
            replace_index="event_id",
        )

        logger.info("Loaded %d events to staging", len(records))
        return len(records)

    events = extract_stripe_events()
    load_events_to_staging(events)

Important: Notice that from secrets_client import secrets happens inside the task function body, not at the module level. This is critical. Airflow parses DAG files frequently to discover tasks. If you import and call secrets.get_secret() at module level, every DAG parse triggers an API call — often hundreds of times per minute. Import and use secrets only inside task callables.


Wiring Secrets into dbt

dbt reads database credentials from profiles.yml, which many teams check into Git. Don't do this. Instead, use environment variables that are populated from your secrets manager at runtime.

The correct profiles.yml uses Jinja template syntax to reference environment variables:

# profiles.yml — safe to commit to Git
analytics:
  target: "{{ env_var('DBT_TARGET', 'dev') }}"
  outputs:
    prod:
      type: postgres
      host: "{{ env_var('DBT_HOST') }}"
      port: "{{ env_var('DBT_PORT', '5432') | int }}"
      user: "{{ env_var('DBT_USER') }}"
      password: "{{ env_var('DBT_PASSWORD') }}"
      dbname: "{{ env_var('DBT_DBNAME') }}"
      schema: "{{ env_var('DBT_SCHEMA', 'public') }}"
      sslmode: require
      threads: 4
    dev:
      type: postgres
      host: localhost
      port: 5432
      user: "{{ env_var('DBT_USER', 'dev_user') }}"
      password: "{{ env_var('DBT_PASSWORD', 'dev_password') }}"
      dbname: analytics_dev
      schema: "{{ env_var('DBT_USER', 'dev') }}"
      threads: 2

Then write a wrapper script that populates those environment variables from Secrets Manager before invoking dbt:

#!/usr/bin/env python3
# run_dbt.py — used by CI/CD and Airflow to execute dbt with live credentials

import json
import os
import subprocess
import sys
import boto3
from botocore.exceptions import ClientError


def inject_secrets_to_env(secret_name: str, region: str = "us-east-1") -> None:
    """
    Fetch warehouse credentials from Secrets Manager and export them
    as environment variables for the dbt process.
    """
    client = boto3.client("secretsmanager", region_name=region)

    try:
        response = client.get_secret_value(SecretId=secret_name)
        creds = json.loads(response["SecretString"])
    except ClientError as e:
        print(f"ERROR: Failed to fetch secret '{secret_name}': {e}", file=sys.stderr)
        sys.exit(1)

    # Map secret fields to the env vars dbt profiles.yml expects
    env_mapping = {
        "host": "DBT_HOST",
        "port": "DBT_PORT",
        "username": "DBT_USER",
        "password": "DBT_PASSWORD",
        "dbname": "DBT_DBNAME",
    }

    for secret_key, env_var in env_mapping.items():
        if secret_key in creds:
            os.environ[env_var] = str(creds[secret_key])

    os.environ["DBT_TARGET"] = os.getenv("ENVIRONMENT", "prod")
    os.environ["DBT_SCHEMA"] = os.getenv("DBT_SCHEMA", "public")


def main():
    secret_name = os.environ.get(
        "WAREHOUSE_SECRET_NAME",
        "prod/analytics-pipeline/warehouse-db"
    )

    print(f"Fetching credentials from: {secret_name}")
    inject_secrets_to_env(secret_name)

    # Pass all remaining arguments directly to dbt
    dbt_command = ["dbt"] + sys.argv[1:]
    print(f"Running: {' '.join(dbt_command)}")

    result = subprocess.run(dbt_command, check=False)
    sys.exit(result.returncode)


if __name__ == "__main__":
    main()

Usage in CI/CD or an Airflow BashOperator:

python run_dbt.py run --select staging.+ --target prod

The credentials live in memory only for the duration of the process and are never written to disk or logs.


Zero-Downtime Credential Rotation

This is where most teams get it wrong. Naive rotation looks like this:

  1. Generate new password
  2. Update the database user's password
  3. Update the secret in Secrets Manager
  4. Pray that no pipeline ran between steps 2 and 3

This approach has a window of failure. If any pipeline reads the old password from cache between the database update and the Secrets Manager update, it fails. The correct approach is dual-credential rotation, which AWS Secrets Manager's built-in rotation implements natively.

The Four-Phase Rotation Protocol

AWS Secrets Manager rotation Lambdas implement this protocol, and understanding it helps you build robust pipelines:

Phase 1 — createSecret: Generate a new credential. Write it to the secret as AWSPENDING, alongside the existing AWSCURRENT. The database still only accepts the old credential.

Phase 2 — setSecret: Apply the new credential to the database. Now the database accepts both the old and new credentials simultaneously (this requires your database to support multiple active passwords, which PostgreSQL does via ALTER USER ... PASSWORD).

Phase 3 — testSecret: Verify the AWSPENDING credential actually works by opening a real database connection with it.

Phase 4 — finishSecret: Promote AWSPENDING to AWSCURRENT. The old credential becomes AWSPREVIOUS and remains valid temporarily. After a grace period, remove the old credential from the database.

During this entire process, running pipelines that read AWSCURRENT continue working — they get the old credential until Phase 4, and the new credential after. Pipelines that cache credentials for up to 5 minutes have a grace period during which AWSPREVIOUS is still valid.

Implementing a Custom Rotation Lambda

For databases that aren't natively supported by AWS Secrets Manager (Snowflake, for example), you write your own rotation Lambda:

# rotation_lambda.py — deploys as a Lambda function triggered by Secrets Manager

import json
import logging
import boto3
import snowflake.connector
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

secrets_client = boto3.client("secretsmanager")


def lambda_handler(event, context):
    """
    Entry point for AWS Secrets Manager rotation.
    The event contains the secret ARN, the rotation step, and a token
    identifying this specific rotation attempt.
    """
    arn = event["SecretId"]
    token = event["ClientRequestToken"]
    step = event["Step"]

    # Verify the secret exists and the token matches a pending version
    metadata = secrets_client.describe_secret(SecretId=arn)
    if not metadata.get("RotationEnabled"):
        raise ValueError(f"Secret {arn} is not configured for rotation")

    versions = metadata.get("VersionIdsToStages", {})
    if token not in versions:
        raise ValueError(f"Token {token} not found in secret versions")

    if "AWSCURRENT" in versions[token]:
        logger.info("Token %s is already AWSCURRENT — rotation complete", token)
        return

    if "AWSPENDING" not in versions[token]:
        raise ValueError(f"Token {token} is not AWSPENDING")

    # Dispatch to the appropriate phase handler
    handlers = {
        "createSecret": create_secret,
        "setSecret": set_secret,
        "testSecret": test_secret,
        "finishSecret": finish_secret,
    }

    if step not in handlers:
        raise ValueError(f"Unknown rotation step: {step}")

    handlers[step](arn, token)


def create_secret(arn: str, token: str) -> None:
    """Phase 1: Generate a new password and write it as AWSPENDING."""
    try:
        # If AWSPENDING already exists (e.g., Lambda retried), use it as-is
        secrets_client.get_secret_value(SecretId=arn, VersionStage="AWSPENDING")
        logger.info("AWSPENDING already exists, skipping creation")
        return
    except ClientError as e:
        if e.response["Error"]["Code"] != "ResourceNotFoundException":
            raise

    current = json.loads(
        secrets_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")[
            "SecretString"
        ]
    )

    # Generate a new password — use Secrets Manager's password generator
    # to get a cryptographically secure, policy-compliant password
    new_password = secrets_client.get_random_password(
        PasswordLength=32,
        ExcludeCharacters='/@"\' ',  # Exclude chars that break connection strings
        RequireEachIncludedType=True,
    )["RandomPassword"]

    pending_secret = {**current, "password": new_password}

    secrets_client.put_secret_value(
        SecretId=arn,
        ClientRequestToken=token,
        SecretString=json.dumps(pending_secret),
        VersionStages=["AWSPENDING"],
    )
    logger.info("Created AWSPENDING secret for %s", arn)


def set_secret(arn: str, token: str) -> None:
    """Phase 2: Apply the new password to Snowflake."""
    current = _get_secret(arn, "AWSCURRENT")
    pending = _get_secret(arn, "AWSPENDING")

    # Connect using the CURRENT credential (we know it works)
    conn = snowflake.connector.connect(
        account=current["account"],
        user=current["admin_user"],  # A separate admin user with ALTER USER privilege
        password=current["admin_password"],
        warehouse=current["warehouse"],
    )

    try:
        cursor = conn.cursor()
        cursor.execute(
            f"ALTER USER {pending['username']} SET PASSWORD = %s",
            (pending["password"],),
        )
        logger.info("Updated Snowflake password for user %s", pending["username"])
    finally:
        conn.close()


def test_secret(arn: str, token: str) -> None:
    """Phase 3: Verify the AWSPENDING credential actually connects."""
    pending = _get_secret(arn, "AWSPENDING")

    try:
        conn = snowflake.connector.connect(
            account=pending["account"],
            user=pending["username"],
            password=pending["password"],
            warehouse=pending["warehouse"],
            database=pending["database"],
        )
        conn.cursor().execute("SELECT CURRENT_USER()")
        conn.close()
        logger.info("AWSPENDING credential test passed")
    except Exception as e:
        logger.error("AWSPENDING credential test FAILED: %s", e)
        raise


def finish_secret(arn: str, token: str) -> None:
    """Phase 4: Promote AWSPENDING to AWSCURRENT."""
    metadata = secrets_client.describe_secret(SecretId=arn)
    current_version = next(
        version_id
        for version_id, stages in metadata["VersionIdsToStages"].items()
        if "AWSCURRENT" in stages
    )

    if current_version == token:
        logger.info("Version %s is already AWSCURRENT", token)
        return

    secrets_client.update_secret_version_stage(
        SecretId=arn,
        VersionStage="AWSCURRENT",
        MoveToVersionId=token,
        RemoveFromVersionId=current_version,
    )
    logger.info("Promoted %s to AWSCURRENT", token)


def _get_secret(arn: str, stage: str) -> dict:
    response = secrets_client.get_secret_value(SecretId=arn, VersionStage=stage)
    return json.loads(response["SecretString"])

Key insight: The rotation Lambda must be idempotent. Secrets Manager may call each phase multiple times if the Lambda times out or errors. That's why create_secret checks whether AWSPENDING already exists before generating a new password — otherwise, a retry would create a second new password and invalidate the one already set in the database.


Handling Rotation in Long-Running Pipelines

Short-lived tasks (< 5 minutes) are generally fine — they read the credential at start, finish before rotation could occur. Long-running pipelines (bulk historical loads, large Spark jobs) need explicit rotation awareness.

The pattern is to use a connection factory rather than a single long-lived connection:

# connection_factory.py
import time
import logging
import psycopg2
from secrets_client import secrets

logger = logging.getLogger(__name__)


class RotationAwareConnectionFactory:
    """
    Creates new database connections that automatically refresh credentials
    if a rotation has occurred since the last connection was opened.

    Use this for batch jobs that run longer than the rotation schedule.
    """

    def __init__(
        self,
        secret_name: str,
        connection_refresh_interval: int = 3600,  # Refresh connection every hour
    ):
        self._secret_name = secret_name
        self._refresh_interval = connection_refresh_interval
        self._conn = None
        self._conn_opened_at = 0.0

    def get_connection(self) -> psycopg2.extensions.connection:
        """
        Return an active connection. If the connection is older than
        refresh_interval, close it and open a new one with fresh credentials.
        """
        now = time.monotonic()
        age = now - self._conn_opened_at

        if self._conn is None or self._conn.closed or age > self._refresh_interval:
            if self._conn and not self._conn.closed:
                self._conn.close()
                logger.info("Closed connection aged %.0f seconds", age)

            # Force-refresh the secret cache to pick up any rotation
            secrets.invalidate_cache(self._secret_name)
            cred = secrets.get_secret(self._secret_name)

            self._conn = psycopg2.connect(
                host=cred["host"],
                port=cred["port"],
                dbname=cred["dbname"],
                user=cred["username"],
                password=cred["password"],
                sslmode=cred.get("ssl_mode", "require"),
                connect_timeout=10,
            )
            self._conn_opened_at = now
            logger.info("Opened fresh connection with credentials from %s", self._secret_name)

        return self._conn

    def execute_batch(self, sql: str, batch: list) -> int:
        """Execute a batch with automatic retry on authentication failure."""
        conn = self.get_connection()
        try:
            with conn.cursor() as cur:
                from psycopg2.extras import execute_values
                execute_values(cur, sql, batch)
                conn.commit()
                return cur.rowcount
        except psycopg2.OperationalError as e:
            if "authentication" in str(e).lower() or "password" in str(e).lower():
                logger.warning(
                    "Authentication error — credential may have rotated. "
                    "Forcing refresh and retrying."
                )
                # Force connection refresh on next call
                self._conn = None
                self._conn_opened_at = 0.0
                # Retry once with fresh credentials
                conn = self.get_connection()
                with conn.cursor() as cur:
                    from psycopg2.extras import execute_values
                    execute_values(cur, sql, batch)
                    conn.commit()
                    return cur.rowcount
            raise

Hands-On Exercise

You're going to build a complete credential rotation test harness for a hypothetical analytics pipeline. This exercise can run locally with LocalStack or against real AWS if you have an account.

Setup

Install dependencies:

pip install boto3 localstack awscli-local psycopg2-binary

Start LocalStack (Docker required):

docker run --rm -d \
  -p 4566:4566 \
  -e SERVICES=secretsmanager \
  --name localstack \
  localstack/localstack

Exercise Steps

Step 1: Create a secret in LocalStack:

awslocal secretsmanager create-secret \
  --name "prod/exercise/warehouse-db" \
  --secret-string '{
    "engine": "postgresql",
    "host": "localhost",
    "port": 5432,
    "dbname": "analytics",
    "username": "pipeline_user",
    "password": "initial-password-abc123",
    "ssl_mode": "disable"
  }'

Step 2: Write a script that:

  1. Reads the secret from LocalStack
  2. Simulates a "rotation" by updating the password field
  3. Reads the secret again and verifies the new password is returned
# exercise_rotation.py
import json
import boto3
import time

# Point the client at LocalStack
client = boto3.client(
    "secretsmanager",
    region_name="us-east-1",
    endpoint_url="http://localhost:4566",
    aws_access_key_id="test",
    aws_secret_access_key="test",
)

SECRET_NAME = "prod/exercise/warehouse-db"


def read_current_password() -> str:
    response = client.get_secret_value(SecretId=SECRET_NAME)
    return json.loads(response["SecretString"])["password"]


def simulate_rotation(new_password: str) -> None:
    """
    Simulate what the rotation Lambda does:
    write new value as AWSPENDING, then promote to AWSCURRENT.
    """
    # Get current secret content
    current = json.loads(
        client.get_secret_value(
            SecretId=SECRET_NAME, VersionStage="AWSCURRENT"
        )["SecretString"]
    )

    # Write pending version
    pending_secret = {**current, "password": new_password}
    put_response = client.put_secret_value(
        SecretId=SECRET_NAME,
        SecretString=json.dumps(pending_secret),
        VersionStages=["AWSPENDING"],
    )
    pending_version_id = put_response["VersionId"]

    print(f"Wrote AWSPENDING version: {pending_version_id}")

    # Simulate test phase (in reality you'd connect to the DB here)
    print("Testing pending credential... (simulated) OK")

    # Promote AWSPENDING to AWSCURRENT
    metadata = client.describe_secret(SecretId=SECRET_NAME)
    current_version_id = next(
        vid
        for vid, stages in metadata["VersionIdsToStages"].items()
        if "AWSCURRENT" in stages
    )

    client.update_secret_version_stage(
        SecretId=SECRET_NAME,
        VersionStage="AWSCURRENT",
        MoveToVersionId=pending_version_id,
        RemoveFromVersionId=current_version_id,
    )
    print(f"Promoted {pending_version_id} to AWSCURRENT")


if __name__ == "__main__":
    print("=== Credential Rotation Exercise ===")

    password_before = read_current_password()
    print(f"Password before rotation: {password_before}")

    simulate_rotation("rotated-password-xyz789")

    password_after = read_current_password()
    print(f"Password after rotation:  {password_after}")

    assert password_before != password_after, "Rotation failed — password unchanged!"
    print("\n✓ Rotation successful. Your pipeline client would now fetch the new credential.")

Step 3: Modify the SecretsManagerClient from earlier to point at LocalStack by passing endpoint_url="http://localhost:4566", then verify that:

  • Before rotation, the cached value is served
  • After calling invalidate_cache(), the new credential is returned
  • The cache TTL correctly prevents a second API call within 5 minutes

This exercise mirrors exactly what happens during production rotation — implement it correctly here, and the production version is just a matter of removing the LocalStack endpoint.


Common Mistakes & Troubleshooting

Mistake 1: Fetching secrets at DAG parse time

Symptom: Airflow scheduler CPU spikes. Secrets Manager rate limit errors in logs (ThrottlingException).

Cause: secrets.get_secret() called at module level in DAG files. Airflow parses every DAG file every 30 seconds by default.

Fix: Move all secret fetches inside task callables. If you need the secret to construct an operator argument, use a deferred approach or store it as an Airflow Variable (which is also backed by Secrets Manager if configured correctly).


Mistake 2: Rotation failures silently leave AWSPENDING orphaned

Symptom: The database password was updated, but Secrets Manager still shows the old value as AWSCURRENT. Pipelines eventually fail when the DBA manually cleans up the orphaned database user.

Cause: The rotation Lambda's testSecret phase failed (usually a network timeout), but the database password was already changed in setSecret. The rotation was rolled back at the Secrets Manager level, but the database doesn't know that.

Fix: Always implement testSecret with comprehensive validation. Log failures verbosely. Consider adding a CloudWatch alarm on rotation Lambda error rates. For critical pipelines, keep the AWSPREVIOUS credential valid for at least 24 hours by not removing it until you've confirmed the AWSCURRENT credential has been used successfully.


Mistake 3: IAM role doesn't have `secretsmanager:GetSecretValue` on the specific resource

Symptom: AccessDeniedException in pipeline logs, but the IAM policy shows secretsmanager:GetSecretValue is allowed.

Cause: The IAM policy grants access to arn:aws:secretsmanager:us-east-1:123456789:secret:prod/* but the actual secret ARN includes a random 6-character suffix: arn:aws:secretsmanager:us-east-1:123456789:secret:prod/analytics-pipeline/warehouse-db-AbCdEf. The wildcard in a resource ARN must account for this suffix.

Fix: Use arn:aws:secretsmanager:us-east-1:123456789:secret:prod/* with a trailing wildcard, or better, use a condition key that matches on the secret name rather than the full ARN.


Mistake 4: Logging the secret value

Symptom: Passwords visible in CloudWatch Logs, Datadog, or Airflow task logs.

Cause: logger.debug("Fetched secret: %s", secret_dict) — the entire dict gets serialized.

Fix: Never log secret values. Log the secret name and version ID only. Add a pre-commit hook that scans for patterns like password, secret_string, or api_key appearing as log arguments.


Mistake 5: Not testing rotation before it matters

Symptom: You haven't rotated credentials in 18 months because you're not sure if the rotation Lambda works, and the security team is now requiring 90-day rotation.

Fix: Treat credential rotation like a database migration — it needs to be tested in staging before running in production. Use the LocalStack exercise above to build confidence. Then run a manual rotation in your staging environment during business hours with engineers watching, before you enable automated rotation on the production schedule.


Summary & Next Steps

You've covered a substantial amount of ground. Let's consolidate what you now know how to do:

You can evaluate and choose a secrets backend based on your infrastructure: AWS Secrets Manager for AWS-native pipelines, Vault for multi-cloud or on-premises, and GCP Secret Manager for GCP workloads. The choice isn't just about features — it's about where your IAM lives and who manages the operational overhead.

You've built a production-grade Python secrets client with TTL caching, explicit error handling, and a cache invalidation mechanism. This pattern reduces API calls, handles transient errors gracefully, and gives you a single place to add observability.

You understand the four-phase rotation protocol and why it matters. Zero-downtime rotation isn't magic — it's a carefully sequenced set of operations that keeps both old and new credentials valid simultaneously during the transition window. You've seen how to implement this for a non-native database like Snowflake.

You've wired secrets into Airflow and dbt in a way that avoids credentials in DAG code, config files, or Git history. Your pipelines now fetch credentials dynamically at runtime, which means rotation happens transparently.

You have a troubleshooting playbook for the five most common secrets management failures — the kinds of things that cause 3 AM incidents.

Where to Go Next

  • Dynamic secrets with Vault: If you're ready to move beyond static credential rotation to genuinely ephemeral credentials, explore Vault's database secrets engine. Your pipeline gets a unique username/password that expires in 15 minutes, making credential compromise nearly meaningless.

  • IRSA and Workload Identity: For Kubernetes-based pipelines, investigate IAM Roles for Service Accounts (AWS) or Workload Identity Federation (GCP) to eliminate static credentials entirely. The pipeline's identity is its Kubernetes service account, and credentials are never stored at all.

  • Secrets scanning in CI/CD: Tools like detect-secrets, truffleHog, and GitHub's built-in secret scanning prevent credentials from ever reaching Git. Add this to your pipeline CI as a mandatory gate.

  • Centralized secrets audit logging: With credentials managed centrally, you can build a meaningful audit trail. CloudTrail logs every GetSecretValue call with the IAM principal, timestamp, and source IP. Build a dashboard that surfaces anomalies: unexpected principals, unusual hours, high call volumes from a single pipeline run.

The 3 AM incident at the beginning of this lesson is preventable. The tools exist, the patterns are proven, and you now have the knowledge to implement them.

Learning Path: Data Pipeline Fundamentals

Previous

Batch vs. Stream Processing: Choosing the Right Ingestion Pattern for Your Pipeline

Related Articles

Data Engineering🌱 Foundation

Slowly Changing Dimensions in Practice: Handling Historical Data Changes in Your Warehouse

15 min
Data Engineering🌱 Foundation

Batch vs. Stream Processing: Choosing the Right Ingestion Pattern for Your Pipeline

17 min
Data Engineering🔥 Expert

Designing Idempotent Data Pipelines: Guaranteeing Exactly-Once Semantics in Production

28 min

On this page

  • Introduction
  • Prerequisites
  • Why Hardcoding Credentials Is a Production Liability, Not Just a Security Risk
  • Choosing Your Secrets Backend
  • AWS Secrets Manager
  • HashiCorp Vault
  • GCP Secret Manager
  • The Anatomy of a Secret in Production
  • Building a Production-Grade Secret Client
  • Integrating Secrets with Airflow
  • Configuring the Secrets Backend
  • Zero-Downtime Credential Rotation
  • The Four-Phase Rotation Protocol
  • Implementing a Custom Rotation Lambda
  • Handling Rotation in Long-Running Pipelines
  • Hands-On Exercise
  • Setup
  • Exercise Steps
  • Common Mistakes & Troubleshooting
  • Mistake 1: Fetching secrets at DAG parse time
  • Mistake 2: Rotation failures silently leave AWSPENDING orphaned
  • Mistake 3: IAM role doesn't have `secretsmanager:GetSecretValue` on the specific resource
  • Mistake 4: Logging the secret value
  • Mistake 5: Not testing rotation before it matters
  • Summary & Next Steps
  • Where to Go Next
  • Using Secrets in a Production DAG
  • Wiring Secrets into dbt
  • Zero-Downtime Credential Rotation
  • The Four-Phase Rotation Protocol
  • Implementing a Custom Rotation Lambda
  • Handling Rotation in Long-Running Pipelines
  • Hands-On Exercise
  • Setup
  • Exercise Steps
  • Common Mistakes & Troubleshooting
  • Mistake 1: Fetching secrets at DAG parse time
  • Mistake 2: Rotation failures silently leave AWSPENDING orphaned
  • Mistake 3: IAM role doesn't have `secretsmanager:GetSecretValue` on the specific resource
  • Mistake 4: Logging the secret value
  • Mistake 5: Not testing rotation before it matters
  • Summary & Next Steps
  • Where to Go Next