Skip to main content

Customizing on_missing

Ignoring dependencies

By default, AutomationCondition.on_missing() will wait for all upstream dependencies to be materialized before executing the asset it's attached to.

In some cases, it can be useful to ignore some upstream dependencies in this calculation. This can be done by passing in an AssetSelection to be ignored:

import dagster as dg

condition = dg.AutomationCondition.on_missing().ignore(dg.AssetSelection.assets("foo"))

Alternatively, you can pass in an AssetSelection to be allowed:

import dagster as dg

condition = dg.AutomationCondition.on_missing().allow(dg.AssetSelection.groups("abc"))

Waiting for all blocking asset checks to complete before executing

The AutomationCondition.all_deps_blocking_checks_passed() condition becomes true after all upstream blocking checks have passed.

This can be combined with AutomationCondition.on_missing() to ensure that your asset does not execute if upstream data is failing data quality checks:

import dagster as dg

condition = (
dg.AutomationCondition.on_missing()
& dg.AutomationCondition.all_deps_blocking_checks_passed()
)

Updating older time partitions

By default, AutomationCondition.on_missing() will only update the latest time partition of an asset.

This means that the condition will not automatically "catch" up if upstream data is delayed for longer than it takes for a new partition to appear. If desired, this sub-condition can be removed or replaced:

import datetime

import dagster as dg

# no limit on how far back to check for missing partitions
all_partitions_condition = dg.AutomationCondition.on_missing().without(
dg.AutomationCondition.in_latest_time_window()
)

# limit to the last 7 days
recent_partitions_condition = dg.AutomationCondition.on_missing().replace(
dg.AutomationCondition.in_latest_time_window(),
dg.AutomationCondition.in_latest_time_window(datetime.timedelta(days=7)),
)

Note that the above modifications will still not consider that were added to the asset before the condition was enabled. To change this behavior, you can modify your condition as follows:

import dagster as dg

condition = dg.AutomationCondition.on_missing().replace(
"handled",
dg.AutomationCondition.newly_requested() | dg.AutomationCondition.newly_updated(),
)