A graph is a set of interconnected ops or sub-graphs. While individual ops typically perform simple tasks, ops can be assembled into a graph to accomplish complex tasks.
Graphs can be used in three different ways:
To back assets - Basic assets are computed using a single op, but if computing one of your assets requires multiple discrete steps, you can compute it using a graph instead.
In the following example, we return one output from the root op (return_one) and pass data along through single inputs and outputs:
from dagster import OpExecutionContext, graph, op
@opdefreturn_one(context: OpExecutionContext)->int:return1@opdefadd_one(context: OpExecutionContext, number:int)->int:return number +1@graphdeflinear():
add_one(add_one(add_one(return_one())))
A single output can be passed to multiple inputs on downstream ops. In this example, the output from the first op is passed to two different ops. The outputs of those ops are combined and passed to the final op:
from dagster import OpExecutionContext, graph, op
@opdefreturn_one(context: OpExecutionContext)->int:return1@opdefadd_one(context: OpExecutionContext, number:int):return number +1@opdefadder(context: OpExecutionContext, a:int, b:int)->int:return a + b
@graphdefinputs_and_outputs():
value = return_one()
a = add_one(value)
b = add_one(value)
adder(a, b)
As an op only starts to execute once all its inputs have been resolved, you can use this behavior to model conditional execution.
In this example, the branching_op outputs either the branch_1 result or branch_2 result. Since op execution is skipped for ops that have unresolved inputs, only one of the downstream ops will execute:
import random
from dagster import Out, Output, graph, op
@op(out={"branch_1": Out(is_required=False),"branch_2": Out(is_required=False)})defbranching_op():
num = random.randint(0,1)if num ==0:yield Output(1,"branch_1")else:yield Output(2,"branch_2")@opdefbranch_1_op(_input):pass@opdefbranch_2_op(_input):pass@graphdefbranching():
branch_1, branch_2 = branching_op()
branch_1_op(branch_1)
branch_2_op(branch_2)
Note: When using conditional branching, Output objects must be yielded instead of returned.
If you have a fixed set of ops that all return the same output type, you can collect the outputs into a list and pass them into a single downstream op.
The downstream op executes only if all of the outputs were successfully created by the upstream op:
from typing import List
from dagster import graph, op
@opdefreturn_one()->int:return1@opdefsum_fan_in(nums:list[int])->int:returnsum(nums)@graphdeffan_in():
fan_outs =[]for i inrange(0,10):
fan_outs.append(return_one.alias(f"return_one_{i}")())
sum_fan_in(fan_outs)
In this example, we have 10 ops that all output the number 1. The sum_fan_in op takes all of these outputs as a list and sums them.
Dependencies in Dagster are primarily data dependencies. Using data dependencies means each input of an op depends on the output of an upstream op.
If you have an op, say Op A, that does not depend on any outputs of another op, say Op B, there theoretically shouldn't be a reason for Op A to run after Op B. In most cases, these two ops should be parallelizable. However, there are some cases where an explicit ordering is required, but it doesn't make sense to pass data through inputs and outputs to model the dependency.
If you need to model an explicit ordering dependency, you can use the Nothing Dagster type on the input definition of the downstream op. This type specifies that you are passing "nothing" via Dagster between the ops, while still using inputs and outputs to model the dependency between the two ops.
from dagster import In, Nothing, graph, op
@opdefcreate_table_1():
get_database_connection().execute("create table_1 as select * from some_source_table")@op(ins={"start": In(Nothing)})defcreate_table_2():
get_database_connection().execute("create table_2 as select * from table_1")@graphdefnothing_dependency():
create_table_2(start=create_table_1())
In this example, create_table_2 has an input of type Nothing meaning that it doesn't expect any data to be provided by the upstream op. This lets us connect them in the graph definition so that create_table_2 executes only after create_table_1 successfully executes.
Nothing type inputs do not have a corresponding parameter in the function since there is no data to pass. When connecting the dependencies, it is recommended to use keyword args to prevent mix-ups with other positional inputs.
Note that in most cases, it is usually possible to pass some data dependency. In the example above, even though we probably don't want to pass the table data itself between the ops, we could pass table pointers. For example, create_table_1 could return a table_pointer output of type str with a value of table_1, and this table name can be used in create_table_2 to more accurately model the data dependency.
Dagster also provides more advanced abstractions to handle dependencies and IO. If you find that you are finding it difficult to model data dependencies when using external storage, check out IO managers.
You can supply an asset as an input to one of the ops in a graph. Dagster can then use the I/O manager on the asset to load the input value for the op.
from dagster import asset, job, op
@assetdefemails_to_send():...@opdefsend_emails(emails)->None:...@jobdefsend_emails_job():
send_emails(emails_to_send.get_asset_spec())
We must use the AssetsDefinition.get_asset_spec, because AssetSpecs are used to represent assets that other assets or jobs depend on, in settings where they won't be materialized themselves.
If the asset is partitioned, then:
If the job is partitioned, the corresponding partition of the asset will be loaded.
If the job is not partitioned, then all partitions of the asset will be loaded. The type that they will be loaded into depends on the I/O manager implementation.
You may run into a situation where you need to programmatically construct the dependencies for a graph. In that case, you can directly define the GraphDefinition object.
To construct a GraphDefinition, you need to pass the constructor a graph name, a list of op or graph definitions, and a dictionary defining the dependency structure. The dependency structure declares the dependencies of each op’s inputs on the outputs of other ops in the graph. The top-level keys of the dependency dictionary are the string names of ops or graphs. If you are using op aliases, be sure to use the aliased name. Values of the top-level keys are also dictionary, which maps input names to a DependencyDefinition.
Sometimes you may want to construct the dependencies of an op graph definition from a YAML file or similar. This is useful when migrating to Dagster from other workflow systems.
For example, you can have a YAML like this:
name: some_example
description: blah blah blah
ops:-def: add_one
alias: A
-def: add_one
alias: B
deps:num:op: A
-def: add_two
alias: C
deps:num:op: A
-def: subtract
deps:left:op: B
right:op: C
You can programmatically generate a GraphDefinition from this YAML:
import os
from dagster import DependencyDefinition, GraphDefinition, NodeInvocation, op
@opdefadd_one(num:int)->int:return num +1@opdefadd_two(num:int)->int:return num +2@opdefsubtract(left:int, right:int)->int:return left + right
defconstruct_graph_with_yaml(yaml_file, op_defs)-> GraphDefinition:
yaml_data = load_yaml_from_path(yaml_file)assertisinstance(yaml_data,dict)
deps ={}for op_yaml_data in yaml_data["ops"]:
def_name = op_yaml_data["def"]
alias = op_yaml_data.get("alias", def_name)
op_deps_entry ={}for input_name, input_data in op_yaml_data.get("deps",{}).items():
op_deps_entry[input_name]= DependencyDefinition(
node=input_data["op"], output=input_data.get("output","result"))
deps[NodeInvocation(name=def_name, alias=alias)]= op_deps_entry
return GraphDefinition(
name=yaml_data["name"],
description=yaml_data.get("description"),
node_defs=op_defs,
dependencies=deps,)defdefine_dep_dsl_graph()-> GraphDefinition:
path = os.path.join(os.path.dirname(__file__),"my_graph.yaml")return construct_graph_with_yaml(path,[add_one, add_two, subtract])
Op graphs can be used to create asset definitions. Graph-backed assets are useful if you have an existing op graph that produces and consumes assets.
Wrapping your graph inside an asset definition gives you all the benefits of software-defined assets — like cross-job lineage — without requiring you to change the code inside your graph. Refer to the graph-backed assets documentation for more info and examples.