From SQL to Python
and Back:
Hybrid Pipelines


01Why the Hybrid Exists

SQL is extraordinary at set operations — joins, aggregations, window functions. Declarative, optimizable by the engine, readable by analysts. But SQL has a ceiling. The moment you need to call an external API, run a trained scikit-learn model, perform image manipulation, or implement a recursive algorithm that isn't tail-recursive, you reach for Python.

Traditional solutions were ugly: dump data to S3, run a Spark job, reload. Or worse, maintain two separate dependency graphs — one in dbt, one in Airflow — and pray they stay in sync.

Snowpark changes the calculus. Python code runs inside Snowflake's execution engine. DataFrames are lazy — the computation graph is pushed down to Snowflake rather than pulling data to a Python process. Combined with dbt's Python model support, you get:

Pipeline Architecture — Hybrid dbt + Snowpark
Raw Source
Snowflake
──▶
Staging
SQL model
──▶
Enrichment
Python model
──▶
Intermediate
SQL model
──▶
Mart / Report
SQL model
SQL transform (declarative)
Python / Snowpark (procedural)
Snowflake compute

02Setting Up: dbt Cloud + Snowpark

Before writing hybrid models, you need to configure dbt Cloud to support Python. The core requirement is a Snowpark-compatible virtual warehouse and a dbt project that points to it.

Project configuration

dbt_project.yml
name: hybrid_pipeline
version: '1.7.0'
require-dbt-version: ">=1.3.0"

profile: snowflake

models:
  hybrid_pipeline:
    staging:
      +materialized: view
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
    python_enrichment:
      +materialized: table   # Python models must be table or incremental
⚠ Constraint

Python models in dbt cannot be materialized as views or ephemeral. They must produce a table or incremental table. This is a Snowpark limitation — the Python execution creates a physical result set.

Connection profile

profiles.yml
snowflake:
  target: dev
  outputs:
    dev:
      type:          snowflake
      account:       "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user:          "{{ env_var('SNOWFLAKE_USER') }}"
      authenticator: externalbrowser
      role:           TRANSFORMER
      database:      ANALYTICS
      warehouse:     SNOWPARK_WH   # Snowpark-optimized warehouse
      schema:        DEV_TRANSFORM
      threads:       4

In dbt Cloud, you set these connection details in the project's environment configuration. The managed runtime handles Python package availability — you can declare packages in a packages.yml or directly inside each Python model.

03Anatomy of a Python dbt Model

A Python dbt model is a .py file that lives in your models/ directory alongside .sql files. It must define a model() function that accepts a dbt object and a session (the Snowpark session), and returns a Snowpark DataFrame.

models/python_enrichment/enrich_customer_signals.py
import pandas as pd
from snowflake.snowpark.functions import col, udf
from snowflake.snowpark.types import FloatType

def model(dbt, session):
    # Declare Python packages used by this model
    dbt.config(
        packages=["scikit-learn", "pandas", "numpy"],
        materialized="table",
    )

    # Reference upstream SQL model — returns a Snowpark DataFrame
    customers_df = dbt.ref("stg_customers")
    events_df    = dbt.ref("stg_events")

    # Join in Snowpark (lazy — runs inside Snowflake)
    joined = customers_df.join(
        events_df,
        on=customers_df["customer_id"] == events_df["customer_id"],
        how="left",
    )

    # Compute a simple engagement score
    result = joined.select(
        col("customer_id"),
        col("email"),
        col("event_count"),
        (col("page_views") * 0.3 + col("purchases") * 0.7)
            .alias("engagement_score"),
    )

    return result

Notice that dbt.ref() works identically in Python models as in SQL models. The upstream dependency is registered in the DAG. dbt will ensure stg_customers and stg_events are built before this model runs.

✓ Key insight

The Snowpark DataFrame returned from model() is lazy — dbt materializes it into Snowflake by calling .collect() or equivalent under the hood. You're writing a computation graph, not pulling data to Python memory.

04Consuming Python Output in SQL

One of the most powerful patterns: use Python to produce something SQL can't (ML scores, API-enriched data, complex parsing) then return to SQL for final aggregation and mart construction.

models/marts/fct_customer_segments.sql
-- Reference the Python model as if it were any other SQL model
WITH enriched AS (
    SELECT * FROM {{ ref('enrich_customer_signals') }}
),

orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
)

SELECT
    e.customer_id,
    e.email,
    e.engagement_score,
    o.lifetime_value,
    CASE
        WHEN e.engagement_score >= 0.8 AND o.lifetime_value > 1000 THEN 'champion'
        WHEN e.engagement_score >= 0.5                           THEN 'engaged'
        WHEN o.lifetime_value    > 500                            THEN 'high_value'
        ELSE                                                          'standard'
    END AS customer_segment,
    CURRENT_TIMESTAMP() AS updated_at

FROM enriched e
LEFT JOIN orders o
    ON e.customer_id = o.customer_id

This is the central elegance of the hybrid approach: {{ ref('enrich_customer_signals') }} doesn't know or care that it references a Python model. From the SQL perspective, it's a table. The lineage engine tracks the full dependency chain.

05Calling External APIs from Python Models

Perhaps the most compelling use case: enriching data with external services — geocoding, entity resolution, third-party scoring APIs — inside a managed dbt run, without separate orchestration.

Snowpark supports vectorized UDFs (Pandas UDFs) which are the right tool for batched API calls. A scalar UDF would call the API once per row; a vectorized UDF works on batches.

models/python_enrichment/geocode_addresses.py
import pandas as pd
import requests
from snowflake.snowpark.functions import pandas_udf
from snowflake.snowpark.types import PandasSeries, StringType

def model(dbt, session):
    dbt.config(
        packages=["pandas", "requests"],
        materialized="incremental",
        unique_key="address_id",
    )

    addresses_df = dbt.ref("stg_addresses")

    # Incremental: only geocode new addresses
    if dbt.is_incremental:
        addresses_df = addresses_df.filter(
            "address_id NOT IN (SELECT address_id FROM "
            + dbt.this.identifier + ")"
        )

    @pandas_udf(session=session, return_type=StringType(), input_types=[StringType()])
    def geocode_batch(addresses: PandasSeries) -> PandasSeries:
        # Use Snowflake's ALLOW_CLIENT_MEM_GROWTH for large batches
        results = []
        for address in addresses:
            resp = requests.get(
                "https://geocode.example.com/v1/geocode",
                params={"q": address, "key": "{{env_var('GEOCODE_KEY')}}"},
                timeout=5,
            )
            data = resp.json() if resp.ok else {}
            results.append(data.get("geojson", None))
        return pd.Series(results)

    enriched = addresses_df.with_column(
        "geojson", geocode_batch(addresses_df["full_address"])
    )

    return enriched
⚠ Egress note

External network calls from Snowpark UDFs require enabling External Network Access in Snowflake. Create a NETWORK RULE and EXTERNAL ACCESS INTEGRATION and grant them to the execution role. Without this, HTTP calls will silently fail or raise an error depending on your Snowflake account settings.

06ML Inference Inside the Pipeline

One of the strongest reasons to invest in Snowpark-Python dbt models: batch ML inference without moving data. Train a model externally, serialize it with joblib or pickle, upload to a Snowflake stage, and invoke it inside a Python model.

models/python_enrichment/predict_churn.py
import pandas as pd
import joblib
import sys, os

def model(dbt, session):
    dbt.config(
        packages=["scikit-learn==1.3.0", "pandas", "joblib"],
        materialized="table",
    )

    # Load model artifact from Snowflake internal stage
    session.file.get(
        "@ML_MODELS/churn_model_v3.pkl",
        "/tmp/"
    )
    clf = joblib.load("/tmp/churn_model_v3.pkl")

    # Pull feature table as Pandas for sklearn inference
    features_df = (
        dbt.ref("fct_customer_features")
           .to_pandas()   # materializes into Python memory
    )

    feature_cols = [
        "days_since_last_order", "avg_order_value",
        "support_tickets_30d",   "engagement_score",
    ]

    features_df["churn_probability"] = clf.predict_proba(
        features_df[feature_cols]
    )[:, 1]
    features_df["churn_flag"] = features_df["churn_probability"] > 0.65

    # Convert back to Snowpark DataFrame to return
    return session.create_dataframe(
        features_df[["customer_id", "churn_probability", "churn_flag"]]
    )
📌 Architecture note

Notice the .to_pandas() call — this is where data leaves Snowflake and enters Python memory on the Snowpark virtual warehouse node. For large feature tables, this can be the bottleneck. Prefer Snowpark ML's Pipeline API for inference if your model is wrapped in it, since it stays lazy.

07Testing Hybrid Models

dbt's test framework works uniformly across SQL and Python models. You write the same schema.yml tests — not_null, unique, relationships, custom generic tests — regardless of whether the upstream model is SQL or Python.

models/python_enrichment/schema.yml
version: 2

models:
  - name: enrich_customer_signals
    description: "Snowpark-computed engagement scores per customer."
    columns:
      - name: customer_id
        tests:
          - not_null
          - unique
      - name: engagement_score
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0.0
              max_value: 1.0

  - name: predict_churn
    description: "ML-predicted churn probability per customer."
    columns:
      - name: churn_probability
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0.0
              max_value: 1.0
      - name: customer_id
        tests:
          - relationships:
              to: ref('stg_customers')
              field: customer_id

08Decision Framework: SQL or Python?

Not every transformation belongs in Python. Over-using Python models incurs real cost: slower execution, more complex debugging, no pushdown optimization for arbitrary Pandas code. Here's a decision guide:

Use case Recommended Reason
Joins, filters, aggregations SQL Query optimizer handles this optimally; no reason to leave SQL
Window functions, rankings SQL Snowflake's window function engine is faster than pandas equivalents
String parsing, regex SQL (prefer) / Python SQL handles most regex; drop to Python for complex NLP parsing
ML model inference Python scikit-learn, XGBoost, LightGBM live in Python; use Snowpark ML if available
External API enrichment Python HTTP calls require Python; use vectorized UDFs for batching
Statistical modeling Python scipy, statsmodels, custom distributions
JSON / semi-structured data SQL (prefer) Snowflake's FLATTEN, LATERAL, and variant types handle most cases natively
Graph algorithms Python networkx or igraph; SQL recursion has depth limits
Image / audio processing Python PIL, librosa — no SQL equivalent

09Operational Patterns in dbt Cloud

Running hybrid pipelines in dbt Cloud introduces managed concerns: environment isolation, package caching, job scheduling, and CI/CD for both SQL and Python model changes.

Separating SQL and Python jobs

A common operational pattern is to separate Python-heavy transformations into their own dbt job, scheduled less frequently than SQL-only transformations. Python models are slower to execute and more expensive — you may not need to re-run ML inference on every hourly SQL refresh.

dbt Cloud job commands
# Job 1: SQL transforms — runs every hour
dbt run --exclude tag:python_heavy --target prod

# Job 2: Python enrichment — runs every 6 hours
dbt run --select tag:python_heavy --target prod

# Job 3: Tests — runs after both
dbt test --target prod

Package pinning

Pin your Python packages to exact versions inside dbt.config(). Floating versions will drift between runs and cause silent behavior changes in ML models:

Best practice — pinned packages
# ✓ Good — exact versions, reproducible
dbt.config(
    packages=[
        "scikit-learn==1.3.2",
        "pandas==2.1.4",
        "numpy==1.26.2",
    ]
)

# ✗ Bad — will drift silently
dbt.config(packages=["scikit-learn", "pandas"])
💡 Tip

dbt Cloud caches installed Python packages per environment, so subsequent runs don't re-download packages. But changing a pinned version invalidates the cache for that model's execution context. Keep a requirements.txt alongside your dbt project for documentation and local dev parity.

10Known Sharp Edges

The hybrid model is powerful but not frictionless. These are the most common pain points teams encounter: