Skip to main content

Dagster & Fivetran with components

info

dg and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.

The dagster-fivetran library provides a FivetranAccountComponent which can be used to easily represent Fivetran connectors as assets in Dagster.

Preparing a Dagster project

To begin, you'll need a Dagster project. You can use an existing project ready for components or scaffold a new one:

uvx create-dagster project my-project && cd my-project/src

Next, you will need to add the dagster-fivetran library to the project:

uv add dagster-fivetran

Scaffolding a Fivetran component

Now that you have a Dagster project, you can scaffold a Fivetran component. You'll need to provide your Fivetran account ID and API credentials:

dg scaffold defs dagster_fivetran.FivetranAccountComponent fivetran_ingest \
--account-id test_account --api-key "{{ env('FIVETRAN_API_KEY') }}" --api-secret "{{ env('FIVETRAN_API_SECRET') }}"
Your package defines a `dagster_dg.plugin` entry point, but this module was not
found in the plugin manifest for the current environment. This means either that
your project is not installed in the current environment, or that the entry point
metadata was added after your module was installed. Python entry points are
registered at package install time. Please reinstall your package into the current
environment to ensure the entry point is registered.

Entry point module: `my_project.components`

To suppress this warning, add "missing_dg_plugin_module_in_manifest" to the `cli.suppress_warnings` list in your configuration.

Your package defines a `dagster_dg.plugin` entry point, but this module was not
found in the plugin manifest for the current environment. This means either that
your project is not installed in the current environment, or that the entry point
metadata was added after your module was installed. Python entry points are
registered at package install time. Please reinstall your package into the current
environment to ensure the entry point is registered.

Entry point module: `my_project.components`

To suppress this warning, add "missing_dg_plugin_module_in_manifest" to the `cli.suppress_warnings` list in your configuration.

Creating a component at /.../my-project/src/my_project/defs/fivetran_ingest.

The scaffold call will generate a component.yaml file:

tree my_project/defs
my_project/defs
├── __init__.py
└── fivetran_ingest
└── defs.yaml

2 directories, 2 files

In its scaffolded form, the component.yaml file contains the configuration for your Fivetran workspace:

my_project/defs/fivetran_ingest/component.yaml
type: dagster_fivetran.FivetranAccountComponent

attributes:
workspace:
account_id: test_account
api_key: '{{ env(''FIVETRAN_API_KEY'') }}'
api_secret: '{{ env(''FIVETRAN_API_SECRET'') }}'

You can check the configuration of your component:

dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ hubspot/company │ hubspot_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ hubspot/contact │ hubspot_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/campaign │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/opportunity │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/task │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/user │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └────────────────────────┴───────────────────────────┴──────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────┘

Selecting specific connectors

You can select specific Fivetran connectors to include in your component using the connector_selector key. This allows you to filter which connectors are represented as assets:

my_project/defs/fivetran_ingest/component.yaml
type: dagster_fivetran.FivetranAccountComponent

attributes:
workspace:
account_id: test_account
api_key: "{{ env('FIVETRAN_API_KEY') }}"
api_secret: "{{ env('FIVETRAN_API_SECRET') }}"
connector_selector:
by_name:
- salesforce_warehouse_sync
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ salesforce/campaign │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/opportunity │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/task │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/user │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └────────────────────────┴───────────────────────────┴──────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────┘

Customizing Fivetran assets

Properties of the assets emitted by each connector can be customized in the component.yaml file using the translation key:

my_project/defs/fivetran_ingest/component.yaml
type: dagster_fivetran.FivetranAccountComponent

attributes:
workspace:
account_id: test_account
api_key: "{{ env('FIVETRAN_API_KEY') }}"
api_secret: "{{ env('FIVETRAN_API_SECRET') }}"
connector_selector:
by_name:
- salesforce_warehouse_sync
translation:
group_name: fivetran_data
description: "Loads data from Fivetran connector {{ props.name }}"
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ salesforce/campaign │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/opportunity │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/task │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/user │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ └────────────────────────┴───────────────┴──────┴───────────┴────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘