Skip to main content

Dagster & dbt Cloud

info

This feature is considered in a beta stage. It is still being tested and may change. For more information, see the API lifecycle stages documentation.

Dagster allows you to run dbt Cloud jobs alongside other technologies. You can schedule them to run as a step in a larger pipeline and manage them as a data asset.

Our updated dbt Cloud integration offers two capabilities:

  • Observability - You can view your dbt Cloud assets in the Dagster Asset Graph and double click into run/materialization history.
  • Orchestration - You can use Dagster to schedule runs/materializations of your dbt Cloud assets, either on a cron schedule, or based on upstream dependencies.

Installation

uv add dagster-dbt

Observability example

To make use of the observability capability, you will need to add code to your Dagster project that does the following:

  1. Defines your dbt Cloud credentials and workspace.
  2. Uses the integration to create asset specs for models in the workspace.
  3. Builds a sensor which will poll dbt Cloud for updates on runs/materialization history and dbt Cloud Assets.
defs/dbt_cloud_observability.py

import os

import dagster as dg
from dagster_dbt.cloud_v2.resources import (
DbtCloudCredentials,
DbtCloudWorkspace,
load_dbt_cloud_asset_specs,
)
from dagster_dbt.cloud_v2.sensor_builder import build_dbt_cloud_polling_sensor

# Define credentials
creds = DbtCloudCredentials(
account_id=os.getenv("DBT_CLOUD_ACCOUNT_ID"),
access_url=os.getenv("DBT_CLOUD_ACCESS_URL"),
token=os.getenv("DBT_CLOUD_TOKEN"),
)

# Define the workspace
workspace = DbtCloudWorkspace(
credentials=creds,
project_id=os.getenv("DBT_CLOUD_PROJECT_ID"),
environment_id=os.getenv("DBT_CLOUD_ENVIRONMENT_ID"),
)

# Use the integration to create asset specs for models in the workspace
dbt_cloud_asset_specs = load_dbt_cloud_asset_specs(workspace=workspace)

# Build a sensor which will poll dbt Cloud for updates on runs/materialization history
# and dbt Cloud Assets
dbt_cloud_polling_sensor = build_dbt_cloud_polling_sensor(workspace=workspace)

Orchestration example

To make use of the orchestration capability, you will need to add code to your Dagster project that does the following:

  1. Defines your dbt Cloud credentials and workspace.
  2. Builds your asset graph in a materializable way.
  3. Adds these assets to the Declarative Automation Sensor.
  4. Builds a sensor to poll dbt Cloud for updates on runs/materialization history and dbt Cloud Assets.
defs/dbt_cloud_orchestration.py

import os

import dagster as dg
from dagster_dbt.cloud_v2.asset_decorator import dbt_cloud_assets
from dagster_dbt.cloud_v2.resources import DbtCloudCredentials, DbtCloudWorkspace
from dagster_dbt.cloud_v2.sensor_builder import build_dbt_cloud_polling_sensor

# Define credentials
creds = DbtCloudCredentials(
account_id=os.getenv("DBT_CLOUD_ACCOUNT_ID"),
access_url=os.getenv("DBT_CLOUD_ACCESS_URL"),
token=os.getenv("DBT_CLOUD_TOKEN"),
)

# Define the worskpace
workspace = DbtCloudWorkspace(
credentials=creds,
project_id=os.getenv("DBT_CLOUD_PROJECT_ID"),
environment_id=os.getenv("DBT_CLOUD_ENVIRONMENT_ID"),
)


# Builds your asset graph in a materializable way
@dbt_cloud_assets(workspace=workspace)
def my_dbt_cloud_assets(
context: dg.AssetExecutionContext, dbt_cloud: DbtCloudWorkspace
):
yield from dbt_cloud.cli(args=["build"], context=context).wait()


# Automates your assets using Declarative Automation
# https://docs.dagster.io/guides/automate/declarative-automation
my_dbt_cloud_assets = my_dbt_cloud_assets.map_asset_specs(
lambda spec: spec.replace_attributes(
automation_condition=dg.AutomationCondition.eager()
)
)
# Adds these assets to the Declarative Automation Sensor
automation_sensor = dg.AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=dg.DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)

# Build a sensor which will poll dbt Cloud for updates on runs/materialization history
# and dbt Cloud Assets
dbt_cloud_polling_sensor = build_dbt_cloud_polling_sensor(workspace=workspace)

About dbt Cloud

dbt Cloud is a hosted service for running dbt jobs. It helps data analysts and engineers productionize dbt deployments. Beyond dbt open source, dbt Cloud provides scheduling , CI/CD, serving documentation, and monitoring & alerting.

If you're currently using dbt Cloud™, you can also use Dagster to run dbt-core in its place. You can read more about how to do that here.