Subclassing components to customize behavior
dg
and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.
You can customize the behavior of a component beyond what is available in the defs.yaml
file by creating a subclass of the component.
There are two ways you can customize a component:
- For one-off customizations, you can create a local component, defined in a Python file in the same directory as your
defs.yaml
file. Customarily, this local component is defined in a file namedcomponent.py
in the component directory. - For customizations which may be reused across multiple components, you can create a global component, defined in a Python file in the
components
directory. This requires that your project is adg
plugin (projects scaffolded using thedg
CLI are automatically plugins).
Creating a customized component
We'll use the SlingReplicationCollectionComponent
as an example. First, we'll scaffold a project with the dg
CLI:
uvx-U create-dagster project my-project \
&& cd my-project/src \
&& uv add dagster-sling \
&& dg scaffold defs dagster_sling.SlingReplicationCollectionComponent my_sling_sync
tree my_project/defs
my_project/defs
├── __init__.py
└── my_sling_sync
├── defs.yaml
└── replication.yaml
2 directories, 3 files
- Local component
- Global component
To define a local component, you can create a subclass of your desired component in a file named component.py
in the same directory as your defs.yaml
file:
from dagster_sling import SlingReplicationCollectionComponent
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
"""Customized Sling component."""
Next, update the type
field in the defs.yaml
file to reference this new component. It should be the fully qualified name of the type:
type: my_project.defs.my_sling_sync.component.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
tree my_project
my_project
├── __init__.py
└── defs
├── __init__.py
└── my_sling_sync
├── component.py
├── defs.yaml
└── replication.yaml
3 directories, 5 files
To define a global component, you can use the dg
CLI to scaffold a new component:
dg scaffold component CustomSlingReplicationComponent
Creating module at: /.../my-project/src/my_project/components
Scaffolded Dagster component at /.../my-project/src/my_project/components/custom_sling_replication_component.py.
tree my_project
my_project
├── __init__.py
├── components
│ ├── __init__.py
│ └── custom_sling_replication_component.py
└── defs
├── __init__.py
└── my_sling_sync
├── defs.yaml
└── replication.yaml
4 directories, 6 files
You can modify the generated component by editing the component.py
file in the components
directory:
from dagster_sling import SlingReplicationCollectionComponent
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
"""Customized Sling component."""
Finally, update the type
field in the defs.yaml
file to reference the new component:
type: my_project.components.custom_sling_replication_component.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
Once you have created your component subclass, you can customize its behavior by overriding methods from the parent class.
Customizing execution
For components that define executable assets, it is customary for the component to implement an execute
method, which can be overridden to customize execution behavior.
For example, you can modify the custom subclass of SlingReplicationCollectionComponent
to add a debug log message during execution:
from collections.abc import Iterator
from dagster_sling import (
SlingReplicationCollectionComponent,
SlingReplicationSpecModel,
SlingResource,
)
import dagster as dg
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
def execute(
self,
context: dg.AssetExecutionContext,
sling: SlingResource,
replication_spec_model: SlingReplicationSpecModel,
) -> Iterator:
context.log.info("*******************CUSTOM*************************")
return sling.replicate(context=context, debug=True)
Adding component-level templating scope
By default, the Jinja scopes available for use in a component's YAML file are:
env
: A function that allows you to access environment variables.automation_condition
: A scope allowing you to access all static constructors of theAutomationCondition
class.
It can be useful to add additional scope options to your component. For example, you may have a custom automation condition that you'd like to use in your component.
To do so, you can define a function that returns an AutomationCondition
and define a get_additional_scope
method on your subclass:
from collections.abc import Mapping
from typing import Any
from dagster_sling import SlingReplicationCollectionComponent
import dagster as dg
class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
def _custom_cron(cron_schedule: str) -> dg.AutomationCondition:
return (
dg.AutomationCondition.on_cron(cron_schedule)
& ~dg.AutomationCondition.in_progress()
)
return {"custom_cron": _custom_cron}
This can then be used in your defs.yaml
file:
- Local component
- Global component
type: my_project.defs.my_sling_sync.component.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
asset_post_processors:
- attributes:
automation_condition: "{{ custom_cron('@daily') }}"
type: my_project.components.custom_sling_replication_component.CustomSlingReplicationComponent
attributes:
replications:
- path: replication.yaml
asset_post_processors:
- attributes:
automation_condition: "{{ custom_cron('@daily') }}"