Ask AI

You are viewing an unreleased or outdated version of the documentation

Using Dagster with the Fivetran library#

This guide provides instructions for using Dagster with Fivetran using the dagster-fivetran library. Your Fivetran connector tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Fivetran assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Fivetran connectors, allowing you to trigger syncs for these on a cadence or based on upstream data changes.

What you'll learn#

  • How to represent Fivetran assets in the Dagster asset graph, including lineage to other Dagster assets.
  • How to customize asset definition metadata for these Fivetran assets.
  • How to materialize Fivetran connector tables from Dagster.
  • How to customize how Fivetran connector tables are materialized.
Prerequisites
  • The dagster and dagster-fivetran libraries installed in your environment
  • Familiarity with asset definitions and the Dagster asset graph
  • Familiarity with Dagster resources
  • Familiarity with Fivetran concepts, like connectors and connector tables
  • A Fivetran workspace
  • A Fivetran API key and API secret. For more information, see Getting Started in the Fivetran REST API documentation.

Set up your environment#

To get started, you'll need to install the dagster and dagster-fivetran Python packages:

pip install dagster dagster-fivetran

Represent Fivetran assets in the asset graph#

To load Fivetran assets into the Dagster asset graph, you must first construct a FivetranWorkspace resource, which allows Dagster to communicate with your Fivetran workspace. You'll need to supply your account ID, API key and API secret. See Getting Started in the Fivetran REST API documentation for more information on how to create your API key and API secret.

Dagster can automatically load all connector tables from your Fivetran workspace as asset specs. Call the undefined.load_fivetran_asset_specs function, which returns list of AssetSpecs representing your Fivetran assets. You can then include these asset specs in your Definitions object:

from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

fivetran_specs = load_fivetran_asset_specs(fivetran_workspace)
defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace})

Sync and materialize Fivetran assets#

You can use Dagster to sync Fivetran connectors and materialize Fivetran connector tables. You can use the undefined.build_fivetran_assets_definitions factory to create all assets definitions for your Fivetran workspace.

from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

all_fivetran_assets = build_fivetran_assets_definitions(workspace=fivetran_workspace)

defs = dg.Definitions(
    assets=all_fivetran_assets,
    resources={"fivetran": fivetran_workspace},
)

Customize the materialization of Fivetran assets#

If you want to customize the sync of your connectors, you can use the undefined.fivetran_assets decorator to do so. This allows you to execute custom code before and after the call to the fivetran sync.

from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


@fivetran_assets(
    connector_id="fivetran_connector_id",
    name="fivetran_connector_id",
    group_name="fivetran_connector_id",
    workspace=fivetran_workspace,
)
def fivetran_connector_assets(
    context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
    # Do something before the materialization...
    yield from fivetran.sync_and_poll(context=context)
    # Do something after the materialization...


defs = dg.Definitions(
    assets=[fivetran_connector_assets],
    resources={"fivetran": fivetran_workspace},
)

Customize asset definition metadata for Fivetran assets#

By default, Dagster will generate asset specs for each Fivetran asset and populate default metadata. You can further customize asset properties by passing an instance of the custom DagsterFivetranTranslator to the undefined.load_fivetran_asset_specs function.

from dagster_fivetran import (
    DagsterFivetranTranslator,
    FivetranConnectorTableProps,
    FivetranWorkspace,
    load_fivetran_asset_specs,
)

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


# A translator class lets us customize properties of the built
# Fivetran assets, such as the owners or asset key
class MyCustomFivetranTranslator(DagsterFivetranTranslator):
    def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
        # We create the default asset spec using super()
        default_spec = super().get_asset_spec(props)
        # We customize the metadata and asset key prefix for all assets
        return default_spec.replace_attributes(
            key=default_spec.key.with_prefix("prefix"),
        ).merge_attributes(metadata={"custom": "metadata"})


fivetran_specs = load_fivetran_asset_specs(
    fivetran_workspace, dagster_fivetran_translator=MyCustomFivetranTranslator()
)

defs = dg.Definitions(assets=fivetran_specs, resources={"fivetran": fivetran_workspace})

Note that super() is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.

You can pass an instance of the custom DagsterFivetranTranslator to the undefined.fivetran_assets decorator or the undefined.build_fivetran_assets_definitions factory.

Fetching column-level metadata for Fivetran assets#

Dagster allows you to emit column-level metadata, like column schema and column lineage, as materialization metadata.

With this metadata, you can view documentation in Dagster for all columns in your Fivetran connector tables.

To enable this feature, call fetch_column_metadata() on the FivetranEventIterator returned by the sync_and_poll() call on the FivetranWorkspace resource.

from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


@fivetran_assets(
    connector_id="fivetran_connector_id",
    workspace=fivetran_workspace,
)
def fivetran_connector_assets(
    context: dg.AssetExecutionContext, fivetran: FivetranWorkspace
):
    yield from fivetran.sync_and_poll(context=context).fetch_column_metadata()

Load Fivetran assets from multiple workspaces#

Definitions from multiple Fivetran workspaces can be combined by instantiating multiple FivetranWorkspace resources and merging their specs. This lets you view all your Fivetran assets in a single asset graph:

from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs

import dagster as dg

sales_fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_SALES_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_SALES_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_SALES_API_SECRET"),
)
marketing_fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_MARKETING_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_MARKETING_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_MARKETING_API_SECRET"),
)

sales_fivetran_specs = load_fivetran_asset_specs(sales_fivetran_workspace)
marketing_fivetran_specs = load_fivetran_asset_specs(marketing_fivetran_workspace)

# Merge the specs into a single set of definitions
defs = dg.Definitions(
    assets=[*sales_fivetran_specs, *marketing_fivetran_specs],
    resources={
        "marketing_fivetran": marketing_fivetran_workspace,
        "sales_fivetran": sales_fivetran_workspace,
    },
)