After creating some asset definitions, you may want to automate checks on the assets that test for data quality.
In this guide, we'll show you a few approaches to defining asset checks, how to use check results to include helpful information, and how to execute checks.
There are a few ways you can define an asset check:
Separately from the assets the checks target - In this approach, asset materialization and asset checks are executed in their own separate operations. If using the multiprocess_executor, this allows you to launch runs that will use separate processes to materialize the asset and execute its check.
Together with assets - In this approach, checks execute in the same operation that materializes the asset.
Using an asset check factory - This approach allows you to define multiple, similar asset checks at once
Loading dbt tests into Dagster - This approach allows you to model your dbt tests as asset checks
In this example, we'll demonstrate how to define separate functions for an asset and the corresponding check.
The following code defines an asset named orders and an asset check named orders_id_has_no_nulls. When executed, the check verifies that all values in the orders asset's primary key column are unique.
The @asset_check decorator decorates the orders_id_has_no_nulls function which returns an AssetCheckResult object.
The orders_id_has_no_nulls check runs in its own op. That means that if you launch a run that does all of the following, the check will execute in a separate process from the process that materializes the asset:
Using @multi_asset_check, you can define multiple checks that execute in a single operation without an asset. This approach avoids the overhead of running a separate operation for every check and may enable reusing computations across checks.
from collections.abc import Iterable
from dagster import(
AssetCheckExecutionContext,
AssetCheckResult,
AssetCheckSeverity,
AssetCheckSpec,
multi_asset_check,)@multi_asset_check(
specs=[
AssetCheckSpec(name="asset_check_one", asset="my_asset_one"),
AssetCheckSpec(name="asset_check_two", asset="my_asset_two"),])defthe_check(context: AssetCheckExecutionContext)-> Iterable[AssetCheckResult]:yield AssetCheckResult(
passed=False,
severity=AssetCheckSeverity.WARN,
description="The asset is over 0.5",
asset_key="asset_check_one",)yield AssetCheckResult(
passed=True,
description="The asset is fresh.",
asset_key="asset_check_two",)
Sometimes, it makes sense for a single function to materialize an asset and execute a check on it.
In this example, we'll demonstrate how to do this by using the check_specs argument. This argument is available when using the @asset or @multi_asset decorators. Each provided AssetCheckSpec declares a check that the decorated function should yield an AssetCheckResult for.
import pandas as pd
from dagster import(
AssetCheckResult,
AssetCheckSpec,
AssetExecutionContext,
Definitions,
Output,
asset,)@asset(check_specs=[AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders")])deforders(context: AssetExecutionContext):
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})# save the output and indicate that it's been saved
orders_df.to_csv("orders")yield Output(value=None)# check it
num_null_order_ids = orders_df["order_id"].isna().sum()yield AssetCheckResult(
passed=bool(num_null_order_ids ==0),)
defs = Definitions(assets=[orders])
To define multiple, similar asset checks, use a factory pattern. In the following example, the factory accepts a list of SQL statements and turns them into asset checks:
from collections.abc import Mapping, Sequence
from typing import Any
from unittest.mock import MagicMock
from dagster import(
AssetCheckResult,
AssetChecksDefinition,
Definitions,
asset,
asset_check,)@assetdeforders():...@assetdefitems():...defmake_check(check_blob: Mapping[str,str])-> AssetChecksDefinition:@asset_check(
name=check_blob["name"],
asset=check_blob["asset"],
required_resource_keys={"db_connection"},)def_check(context):
rows = context.resources.db_connection.execute(check_blob["sql"])return AssetCheckResult(passed=len(rows)==0, metadata={"num_rows":len(rows)})return _check
check_blobs =[{"name":"orders_id_has_no_nulls","asset":"orders","sql":"select * from orders where order_id is null",},{"name":"items_id_has_no_nulls","asset":"items","sql":"select * from items where item_id is null",},]
defs = Definitions(
assets=[orders, items],
asset_checks=[make_check(check_blob)for check_blob in check_blobs],
resources={"db_connection": MagicMock()},)
Using AssetCheckSeverity, you can define a severity on check results. The default severity is ERROR.
The severity determines how the check result will display in the UI. For example, if a check fails with ERROR severity, the asset will appear red in the lineage graph in the UI.
Including details about a check result can provide helpful context to others who view it in the UI. Using the metadata argument on AssetCheckResult, you can include information about why a check passed or failed.
In the following example, we added num_null_order_ids as metadata to the orders_id_has_no_nulls check:
To block downstream assets from executing when checks fail, set the blocking argument to True in the @asset_check decorator. In the following example, check_upstream_asset will block downstream_asset from executing.
Materializing an asset from the UI will also execute any checks defined for that asset. To execute a check without materializing the asset, use the Checks tab of the Asset's details page.
To define jobs that execute sets of assets and checks, you can use define_asset_job and then trigger the jobs via sensors or schedules. By default, checks are included with the assets they check. You can also define jobs that include only checks, or only assets.
from dagster import(
AssetSelection,
Definitions,
ScheduleDefinition,
asset,
asset_check,
define_asset_job,)@assetdefmy_asset():...@asset_check(asset=my_asset)defcheck_1():...@asset_check(asset=my_asset)defcheck_2():...# includes my_asset and both checks
my_job = define_asset_job("my_job", selection=AssetSelection.assets(my_asset))# includes only my_asset
my_asset_only_job = define_asset_job("my_asset_only_job",
selection=AssetSelection.assets(my_asset).without_checks(),)# includes check_1 and check_2, but not my_asset
checks_only_job = define_asset_job("checks_only_job", selection=AssetSelection.checks_for_assets(my_asset))# includes only check_1
check_1_job = define_asset_job("check_1_job", selection=AssetSelection.checks(check_1))# schedule my_job to run every day at midnight
basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")
defs = Definitions(
assets=[my_asset],
asset_checks=[check_1, check_2],
jobs=[my_job, my_asset_only_job, checks_only_job, check_1_job],
schedules=[basic_schedule],)
In Dagster+, you can set up alerts to notify you when assets checks fail. To alert on failed checks, create an alert policy with the following settings:
Alert policy type - Asset alert
Target - The asset keys or groups you've defined checks for. You can also target all assets.
Events - Under Asset checks, check the box for the severity you've defined for failed checks: Failed (WARN) or Failed (ERROR)