Skip to main content

Run executor limits

While concurrency pool limits allow you to limit the number of ops executing across all runs, to limit the number of ops or assets executing within a single run, you need to configure your run executor. You can limit concurrency for ops and assets in runs by using max_concurrent in the run config, either in Python or using the Launchpad in the Dagster UI.

info

The default limit for op execution within a run depends on which run executor you are using. For example, the multiprocess_executor by default limits the number of ops executing to the value of multiprocessing.cpu_count() in the launched run.

Limit concurrent execution for a specific job

src/<project_name>/defs/assets.py
import time

import dagster as dg


@dg.asset
def first_asset(context: dg.AssetExecutionContext):
time.sleep(75)
context.log.info("first asset executing")


@dg.asset
def second_asset(context: dg.AssetExecutionContext):
time.sleep(75)
context.log.info("second asset executing")


@dg.asset
def third_asset(context: dg.AssetExecutionContext):
time.sleep(75)
context.log.info("third asset executing")


# limits concurrent asset execution for `my_job` runs to 2, overrides the limit set on the Definitions object
my_job = dg.define_asset_job(
name="my_job",
selection=[first_asset, second_asset, third_asset],
executor_def=dg.multiprocess_executor.configured({"max_concurrent": 2}),
)

Limit concurrent execution for all runs in a code location

src/<project_name>/defs/executor.py
import dagster as dg


@dg.definitions
def executor() -> dg.Definitions:
return dg.Definitions(
executor=dg.multiprocess_executor.configured({"max_concurrent": 4})
)

Limit by tag within a single run

Tag-based concurrency limits ops with specific tags, allowing fine-grained control within a run. Multiple ops can run in parallel, but only N with a given tag. This is useful when different ops in the same run have different resource requirements.

src/<project_name>/defs/assets.py
import time

import dagster as dg


@dg.asset(op_tags={"database": "warehouse"})
def warehouse_sync(context: dg.AssetExecutionContext):
"""Tagged asset for tag-based concurrency control."""
context.log.info("Syncing to warehouse...")
time.sleep(10)


@dg.asset(op_tags={"database": "warehouse"})
def warehouse_aggregate(context: dg.AssetExecutionContext):
"""Another asset with same tag for concurrency grouping."""
context.log.info("Aggregating warehouse data...")
time.sleep(10)


# Job with tag concurrency limits - only 1 asset with database=warehouse tag can run at a time
warehouse_job = dg.define_asset_job(
name="warehouse_job",
selection=[warehouse_sync, warehouse_aggregate],
executor_def=dg.multiprocess_executor.configured(
{
"max_concurrent": 4,
"tag_concurrency_limits": [
{"key": "database", "value": "warehouse", "limit": 1}
],
}
),
)