Skip to main content

Customizing existing component types

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

You can customize the behavior of a component beyond what is available in the component.yaml file by creating a subclass of the component type. There are two ways you can customize a component:

  • For one-off customizations, you can create a local component type, defined in a Python file in the same directory as your component.yaml file. Customarily, this local component type is defined in a file named component.py in the component directory.
  • For customizations which may be reused across multiple components, you can create a global component type, defined in a Python file in the lib directory. This requires that your project is a dg plugin (projects scaffolded using the dg CLI are automatically plugins).

Creating a customized component type

We'll use the SlingReplicationCollectionComponent as an example. First, we'll scaffold a project with the dg CLI:

dg scaffold project my-project \
&& cd my-project/src \
&& uv add dagster-sling \
&& dg scaffold dagster_sling.SlingReplicationCollectionComponent my_sling_sync
tree my_project/defs
my_project/defs
├── __init__.py
└── my_sling_sync
├── component.yaml
└── replication.yaml

2 directories, 3 files

To define a local component type, you can create a subclass of your desired component in a file named component.py in the same directory as your component.yaml file:

my_project/defs/my_sling_sync/component.py
from dagster_sling import SlingReplicationCollectionComponent


class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
"""Customized Sling component."""

Then, we update the type field in our component.yaml file to reference this new component type. It should be the fully qualified name of the type:

my_project/defs/my_sling_sync/component.yaml
type: my_project.defs.my_sling_sync.component.CustomSlingReplicationComponent

attributes:
replications:
- path: replication.yaml
tree my_project
my_project
├── __init__.py
├── definitions.py
├── defs
│   ├── __init__.py
│   └── my_sling_sync
│   ├── component.py
│   ├── component.yaml
│   └── replication.yaml
└── lib
└── __init__.py

4 directories, 7 files

Once we have created our component type subclass, we can customize its behavior by overriding methods from the parent class.

Customizing execution

For components which define executable assets, it is customary for the component type to implement an execute method, which can be overridden to customize execution behavior.

For example, we can modify our custom subclass of SlingReplicationCollectionComponent to add a debug log message during execution:

from collections.abc import Iterator

from dagster_sling import (
SlingReplicationCollectionComponent,
SlingReplicationSpecModel,
SlingResource,
)

import dagster as dg


class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
def execute(
self,
context: dg.AssetExecutionContext,
sling: SlingResource,
replication_spec_model: SlingReplicationSpecModel,
) -> Iterator:
context.log.info("*******************CUSTOM*************************")
return sling.replicate(context=context, debug=True)

Adding component-level templating scope

By default, the Jinja scopes available for use in a component's YAML file are:

  • env: A function that allows you to access environment variables.
  • automation_condition: A scope allowing you to access all static constructors of the AutomationCondition class.

It can be useful to add additional scope options to your component type. For example, you may have a custom automation condition that you'd like to use in your component.

To do so, you can define a function that returns an AutomationCondition and define a get_additional_scope method on your subclass:

from collections.abc import Mapping
from typing import Any

from dagster_sling import SlingReplicationCollectionComponent

import dagster as dg


class CustomSlingReplicationComponent(SlingReplicationCollectionComponent):
@classmethod
def get_additional_scope(cls) -> Mapping[str, Any]:
def _custom_cron(cron_schedule: str) -> dg.AutomationCondition:
return (
dg.AutomationCondition.on_cron(cron_schedule)
& ~dg.AutomationCondition.in_progress()
)

return {"custom_cron": _custom_cron}

This can then be used in your component.yaml file:

type: my_project.defs.my_sling_sync.component.CustomSlingReplicationComponent

attributes:
replications:
- path: replication.yaml
asset_post_processors:
- attributes:
automation_condition: "{{ custom_cron('@daily') }}"