Run executor limits
While concurrency pool limits allow you to limit the number of ops executing across all runs, to limit the number of ops or assets executing within a single run, you need to configure your run executor. You can limit concurrency for ops and assets in runs by using max_concurrent in the run config, either in Python or using the Launchpad in the Dagster UI.
The default limit for op execution within a run depends on which run executor you are using. For example, the multiprocess_executor by default limits the number of ops executing to the value of multiprocessing.cpu_count() in the launched run.
Limit concurrent execution for a specific job
import time
import dagster as dg
@dg.asset
def first_asset(context: dg.AssetExecutionContext):
time.sleep(75)
context.log.info("first asset executing")
@dg.asset
def second_asset(context: dg.AssetExecutionContext):
time.sleep(75)
context.log.info("second asset executing")
@dg.asset
def third_asset(context: dg.AssetExecutionContext):
time.sleep(75)
context.log.info("third asset executing")
# limits concurrent asset execution for `my_job` runs to 2, overrides the limit set on the Definitions object
my_job = dg.define_asset_job(
name="my_job",
selection=[first_asset, second_asset, third_asset],
executor_def=dg.multiprocess_executor.configured({"max_concurrent": 2}),
)
Limit concurrent execution for all runs in a code location
import dagster as dg
@dg.definitions
def executor() -> dg.Definitions:
return dg.Definitions(
executor=dg.multiprocess_executor.configured({"max_concurrent": 4})
)
Limit by tag within a single run
Tag-based concurrency limits ops with specific tags, allowing fine-grained control within a run. Multiple ops can run in parallel, but only N with a given tag. This is useful when different ops in the same run have different resource requirements.
import time
import dagster as dg
@dg.asset(op_tags={"database": "warehouse"})
def warehouse_sync(context: dg.AssetExecutionContext):
"""Tagged asset for tag-based concurrency control."""
context.log.info("Syncing to warehouse...")
time.sleep(10)
@dg.asset(op_tags={"database": "warehouse"})
def warehouse_aggregate(context: dg.AssetExecutionContext):
"""Another asset with same tag for concurrency grouping."""
context.log.info("Aggregating warehouse data...")
time.sleep(10)
# Job with tag concurrency limits - only 1 asset with database=warehouse tag can run at a time
warehouse_job = dg.define_asset_job(
name="warehouse_job",
selection=[warehouse_sync, warehouse_aggregate],
executor_def=dg.multiprocess_executor.configured(
{
"max_concurrent": 4,
"tag_concurrency_limits": [
{"key": "database", "value": "warehouse", "limit": 1}
],
}
),
)