Dagster & Fivetran with components
dg
and Dagster Components are under active development. You may encounter feature gaps, and the APIs may change. To report issues or give feedback, please join the #dg-components channel in the Dagster Community Slack.
The dagster-fivetran library provides a FivetranAccountComponent
which can be used to easily represent Fivetran connectors as assets in Dagster.
Preparing a Dagster project
To begin, you'll need a Dagster project. You can use an existing project ready for components or scaffold a new one:
uvx create-dagster project my-project && cd my-project/src
Next, you will need to add the dagster-fivetran
library to the project:
uv add dagster-fivetran
Scaffolding a Fivetran component
Now that you have a Dagster project, you can scaffold a Fivetran component. You'll need to provide your Fivetran account ID and API credentials:
dg scaffold defs dagster_fivetran.FivetranAccountComponent fivetran_ingest \
--account-id test_account --api-key "{{ env('FIVETRAN_API_KEY') }}" --api-secret "{{ env('FIVETRAN_API_SECRET') }}"
Your package defines a `dagster_dg.plugin` entry point, but this module was not
found in the plugin manifest for the current environment. This means either that
your project is not installed in the current environment, or that the entry point
metadata was added after your module was installed. Python entry points are
registered at package install time. Please reinstall your package into the current
environment to ensure the entry point is registered.
Entry point module: `my_project.components`
To suppress this warning, add "missing_dg_plugin_module_in_manifest" to the `cli.suppress_warnings` list in your configuration.
Your package defines a `dagster_dg.plugin` entry point, but this module was not
found in the plugin manifest for the current environment. This means either that
your project is not installed in the current environment, or that the entry point
metadata was added after your module was installed. Python entry points are
registered at package install time. Please reinstall your package into the current
environment to ensure the entry point is registered.
Entry point module: `my_project.components`
To suppress this warning, add "missing_dg_plugin_module_in_manifest" to the `cli.suppress_warnings` list in your configuration.
Creating a component at /.../my-project/src/my_project/defs/fivetran_ingest.
The scaffold call will generate a component.yaml
file:
tree my_project/defs
my_project/defs
├── __init__.py
└── fivetran_ingest
└── defs.yaml
2 directories, 2 files
In its scaffolded form, the component.yaml
file contains the configuration for your Fivetran workspace:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
account_id: test_account
api_key: '{{ env(''FIVETRAN_API_KEY'') }}'
api_secret: '{{ env(''FIVETRAN_API_SECRET'') }}'
You can check the configuration of your component:
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ hubspot/company │ hubspot_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼─────────────── ────────────┼──────┼───────────┼─────────────┤ │
│ │ │ hubspot/contact │ hubspot_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/campaign │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/opportunity │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/task │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/user │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └────────────────────────┴───────────────────────────┴──────┴───────────┴─────────────┘ │
└─────────┴─────────────────────────────────────────────────────────────────────────────────────────┘
Selecting specific connectors
You can select specific Fivetran connectors to include in your component using the connector_selector
key. This allows you to filter which connectors are represented as assets:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
account_id: test_account
api_key: "{{ env('FIVETRAN_API_KEY') }}"
api_secret: "{{ env('FIVETRAN_API_SECRET') }}"
connector_selector:
by_name:
- salesforce_warehouse_sync
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ salesforce/campaign │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/opportunity │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────── ──┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/task │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ ├────────────────────────┼───────────────────────────┼──────┼───────────┼─────────────┤ │
│ │ │ salesforce/user │ salesforce_warehouse_sync │ │ fivetran │ │ │
│ │ │ │ │ │ snowflake │ │ │
│ │ └────────────────────────┴───────────────────────────┴──────┴───────────┴─────────────┘ │
└─────────┴───────────────────────────────────────────────────────────────────────── ────────────────┘
Customizing Fivetran assets
Properties of the assets emitted by each connector can be customized in the component.yaml
file using the translation
key:
type: dagster_fivetran.FivetranAccountComponent
attributes:
workspace:
account_id: test_account
api_key: "{{ env('FIVETRAN_API_KEY') }}"
api_secret: "{{ env('FIVETRAN_API_SECRET') }}"
connector_selector:
by_name:
- salesforce_warehouse_sync
translation:
group_name: fivetran_data
description: "Loads data from Fivetran connector {{ props.name }}"
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ │
│ │ │ salesforce/campaign │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/opportunity │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/task │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ ├────────────────────────┼───────────────┼──────┼───────────┼────────────────────────────────────────────┤ │
│ │ │ salesforce/user │ fivetran_data │ │ fivetran │ Loads data from Fivetran connector │ │
│ │ │ │ │ │ snowflake │ salesforce_warehouse_sync │ │
│ │ └────────────────────────┴───────────────┴──────┴───────────┴────────────────────────────────────────────┘ │
└─────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────┘