This guide provides instructions for using Dagster with Fivetran using the dagster-fivetran library. Your Fivetran connector tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Fivetran assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Fivetran connectors, allowing you to trigger syncs for these on a cadence or based on upstream data changes.
To load Fivetran assets into the Dagster asset graph, you must first construct a FivetranWorkspace resource, which allows Dagster to communicate with your Fivetran workspace. You'll need to supply your account ID, API key and API secret. See Getting Started in the Fivetran REST API documentation for more information on how to create your API key and API secret.
Dagster can automatically load all connector tables from your Fivetran workspace as asset specs. Call the undefined.load_fivetran_asset_specs function, which returns list of AssetSpecs representing your Fivetran assets. You can then include these asset specs in your Definitions object:
You can use Dagster to sync Fivetran connectors and materialize Fivetran connector tables. You can use the undefined.build_fivetran_assets_definitions factory to create all assets definitions for your Fivetran workspace.
If you want to customize the sync of your connectors, you can use the undefined.fivetran_assets decorator to do so. This allows you to execute custom code before and after the call to the fivetran sync.
from dagster_fivetran import FivetranWorkspace, fivetran_assets
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),)@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,)deffivetran_connector_assets(
context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):# Do something before the materialization...yieldfrom fivetran.sync_and_poll(context=context)# Do something after the materialization...
defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},)
Customize asset definition metadata for Fivetran assets#
By default, Dagster will generate asset specs for each Fivetran asset and populate default metadata. You can further customize asset properties by passing an instance of the custom DagsterFivetranTranslator to the undefined.load_fivetran_asset_specs function.
from dagster_fivetran import(
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
load_fivetran_asset_specs,)import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),)# A translator class lets us customize properties of the built# Fivetran assets, such as the owners or asset keyclassMyCustomFivetranTranslator(DagsterFivetranTranslator):defget_asset_spec(self, props: FivetranConnectorTableProps)-> dg.AssetSpec:# We create the default asset spec using super()
default_spec =super().get_asset_spec(props)# We customize the metadata and asset key prefix for all assetsreturn default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),).merge_attributes(metadata={"custom":"metadata"})
fivetran_specs = load_fivetran_asset_specs(
fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator())
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace})
Note that super() is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.
Definitions from multiple Fivetran workspaces can be combined by instantiating multiple FivetranWorkspace resources and merging their specs. This lets you view all your Fivetran assets in a single asset graph:
from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs
import dagster as dg
sales_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_SALES_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_SALES_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_SALES_API_SECRET"),)
marketing_fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_MARKETING_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_MARKETING_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_MARKETING_API_SECRET"),)
sales_fivetran_specs = load_fivetran_asset_specs(sales_fivetran_workspace)
marketing_fivetran_specs = load_fivetran_asset_specs(marketing_fivetran_workspace)# Merge the specs into a single set of definitions
defs = dg.Definitions(
assets=[*sales_fivetran_specs,*marketing_fivetran_specs],
resources={"marketing_fivetran": marketing_fivetran_workspace,"sales_fivetran": sales_fivetran_workspace,},)