Dagster Pipes is a toolkit for integrating Dagster with an arbitrary external compute environment. While many users will be well-served by the simplified interface offered by Pipes client objects (e.g. PipesSubprocessClient, PipesDatabricksClient), others will need a greater level of control over Pipes. This is particularly the case for users seeking to connect large existing codebases to Dagster.
This guide will cover the lower level Pipes APIs and how you can compose them to provide a custom solution for your data platform.
An environment external to Dagster, for example: Databricks, Kubernetes, Docker.
Orchestration process
A process running Dagster code to materialize an asset or execute an op.
External process
A process running in an external environment, from which log output and Dagster events can be reported back to the orchestration process. The orchestration process must launch the external process.
Bootstrap payload
A small bundle of key/value pairs that is written by the orchestration process to some globally accessible key-value store in the external process. Typically the bootstrap payload will be written in environment variables, but another mechanism may be used for external environments that do not support setting environment variables.
Context payload
A JSON object containing information derived from the execution context (OpExecutionContext or AssetExecutionContext ) in the orchestration process. This includes in-scope asset keys, partition keys, etc. The context payload is written by the orchestration process to some location accessible to the external process. The external process obtains the location of the context payload (e.g. an object URL on Amazon S3) from the bootstrap payload and reads the context payload.
Messages
JSON objects written by the external process for consumption by the orchestration process. Messages can report asset materializations and check results as well as trigger orchestration-side logging.
Logs
Log files generated by the external process, including but not limited to logged stdout/stderr streams.
Params loader
An entity in the external process that reads the bootstrap payload from some globally accessible key-value store. The default params loader reads the bootstrap payload from environment variables.
Context injector
An entity in the orchestration process that writes the context payload to an externally accessible location and yields a set of parameters encoding this location for inclusion in the bootstrap payload.
Context loader
An entity in the external process that loads the context payload from the location specified in the bootstrap payload.
Message reader
An entity in the orchestration process that reads messages (and optionally log files) from an externally accessible location and yields a set of parameters encoding this location in the bootstrap payload.
Message writer
An entity in the external process that writes messages to the location specified in the bootstrap payload.
The creation of communications channels between the orchestration and external process.
The launching and terminating of the external process.
The reading of all messages reported by the external process and the closing of communications channels.
There are separate APIs for interacting with a Pipes session in the orchestration and external processes. The orchestration process API is defined in dagster. The external process API is defined by a Pipes integration library that is loaded by user code in the external process. This library knows how to interpret the bootstrap payload and spin up a context loader and message writer.
At present the only official Dagster Pipes integration library is Python’s dagster-pipes, available on PyPI. The library has no dependencies and fits in a single file, so it may also be trivially vendored.
Pipes sessions are represented in the orchestration process by the PipesSession class. A session is started with the open_pipes_session context manager, which yields a PipesSession. open_pipes_session should be called inside of an asset or op compute function - somewhere an OpExecutionContext or AssetExecutionContext is available:
### ORCHESTRATION PROCESSfrom collections.abc import Iterator
# `third_party_api` is a fictional package representing a third-party library (or user code)# providing APIs for launching and polling a process in some external environment.from third_party_api import(
is_external_process_done,
launch_external_process,)from dagster import(
AssetExecutionContext,
PipesExecutionResult,
PipesTempFileContextInjector,
PipesTempFileMessageReader,
asset,
open_pipes_session,)@assetdefsome_pipes_asset(context: AssetExecutionContext)-> Iterator[PipesExecutionResult]:with open_pipes_session(
context=context,
extras={"foo":"bar"},
context_injector=PipesTempFileContextInjector(),
message_reader=PipesTempFileMessageReader(),)as pipes_session:# Get the bootstrap payload encoded as a Dict[str, str] suitable for passage as environment# variables.
env_vars = pipes_session.get_bootstrap_env_vars()# `launch_external_process` is responsible for including the passed `env_vars` in the# launched external process.
external_process = launch_external_process(env_vars)# Continually poll the external process and stream any incrementally received messages back# to Dagsterwhilenot is_external_process_done(external_process):yieldfrom pipes_session.get_results()# Yield any remaining results received from the external process.yieldfrom pipes_session.get_results()
extras: A bundle of key-value pairs in the form of a JSON-serializable dictionary. This is slotted into the context payload. Users can pass arbitrary data here that they want to expose to the external process.
context_injector: A context injector responsible for writing the serialized context payload to some location and expressing that location as bootstrap parameters for consumption by the external process. Above we used the built-in (and default) PipesTempFileContextInjector, which writes the serialized context payload to an automatically created local temp file and exposes the path to that file as a bootstrap parameter.
message_reader: A message reader responsible for reading streaming messages and log files written to some location, and expressing that location as bootstrap parameters for consumption by the external process. Above we used the built-in (and default) PipesTempFileMessageReader, which tails an automatically created local temp file and exposes the path to that file as a bootstrap parameter.
Python context manager invocations have three parts:
An opening routine (__enter__, executed at the start of a with block).
A body (user code nested in a with block).
A closing routine (__exit__, executed at the end of a with block).
Opening routine: Writes the context payload and spins up the message reader (which usually involves starting a thread to continually read messages). These steps may involve the creation of resources, such as a temporary file (locally or on some remote system) for the context payload or a temporary directory to which messages will be written.
Body: User code should handle launching, polling, and termination of the external process here. While the external process is executing, any intermediate results that have been received can be reported to Dagster with yield from pipes_session.get_results().
Closing routine: Ensures that all messages written by the external process have been read into the orchestration process and cleans up any resources used by the context injector and message reader.
As noted above, currently the only existing Pipes integration library is Python’s dagster-pipes. The below example therefore uses Python and dagster-pipes. In the future we will be releasing dagster-pipes equivalents for selected other languages. and the concepts illustrated here should map straightforwardly to these other integration libraries.
A Pipes session is represented in the external process by a PipesContext object. A session created by the launching orchestration process can be connected to with open_dagster_pipes from dagster-pipes:
### EXTERNAL PROCESSfrom dagster_pipes import(
PipesDefaultContextLoader,
PipesDefaultMessageWriter,
PipesEnvVarParamsLoader,
open_dagster_pipes,)# `user_code` is a fictional package providing pre-existing business logic for assets.from user_code import get_data_version, get_metric
with open_dagster_pipes(
params_loader=PipesEnvVarParamsLoader(),
context_loader=PipesDefaultContextLoader(),
message_writer=PipesDefaultMessageWriter(),)as pipes:# Equivalent of calling `context.log.info` on the orchestration side.# Streams log message back to orchestration process.
pipes.log.info(f"materializing asset {pipes.asset_key}")# ... business logic# Creates a `MaterializeResult` on the orchestration side. Notice no value for the asset is# included. Pipes only supports reporting that a materialization occurred and associated# metadata.
pipes.report_asset_materialization(
metadata={"some_metric":{"raw_value": get_metric(),"type":"text"}},
data_version=get_data_version(),)
params_loader: A params loader responsible for loading the bootstrap payload injected into the external process at launch. The standard approach is to inject the bootstrap payload into predetermined environment variables that the PipesEnvVarParamsLoader knows how to read. However, a different bootstrap parameter loader can be substituted in environments where it is not possible to modify environment variables.
context_loader: A context loader responsible for loading the context payload from a location specified in the bootstrap payload. Above we use PipesDefaultContextLoader, which will look for a path key in the bootstrap params for a file path to target. The PipesTempFileContextInjector used earlier on the orchestration side writes this path key, but the PipesDefaultContextLoader does not otherwise depend on a specific context injector.
message_writer: A message writer responsible for writing streaming messages to a location specified in the bootstrap payload. Above we use PipesDefaultMessageWriter, which will look for a path key in the bootstrap params for a file path to target. The PipesTempFileMessageReader used earlier on the orchestration side writes this path key, but the PipesDefaultMessageWriter does not otherwise depend on a specific context injector.
Opening routine: Reads the bootstrap payload from the environment and then the context payload. Spins up the message writer, which may involve starting a thread to periodically write buffered messages.
Body: Business logic goes here, and can use the yielded PipesContext (in the pipes variable above) to read context information or write messages.
Closing routine: Ensures that any messages submitted by business logic have been written before the process exits. This is necessary because some message writers buffer messages between writes.
Users may implement custom params loaders, context loader/injector pairs, and message reader/writer pairs. Any of the above may be necessary if you’d like to use Dagster Pipes in an environment for which Dagster does not currently ship a compatible context loader/injector or message reader/writer.
Params loaders need to inherit from PipesParamsLoader. Here is an example that loads parameters from an object called METADATA imported from a fictional package called cloud_service. It is assumed that "cloud service" represents some compute platform, that the cloud_service package is available in the environment, and that the API for launching processes in “cloud service” allows you to set arbitrary key-value pairs in a payload that is exposed as cloud_service.METADATA.
In general if you are implementing a custom variant of one, you will want to implement a matching variant of the other. Below is a simple example that uses a fictional cloud_service key/value store to write the context. First the context injector:
### ORCHESTRATION PROCESSimport json
import random
import string
from collections.abc import Iterator
from contextlib import contextmanager
import cloud_service
from dagster_pipes import PipesContextData, PipesParams
from dagster import PipesContextInjector
classMyCustomCloudServiceContextInjector(PipesContextInjector):# Note that `PipesContextData` corresponds to what this document# calls the "context payload"-- a JSON-serializable dictionary with context info.@contextmanagerdefinject_context(self, context_data:"PipesContextData")-> Iterator[PipesParams]:
key ="".join(random.choices(string.ascii_letters, k=30))
cloud_service.write(key, json.dumps(context_data))yield{"key": key}defno_messages_debug_text(self)->str:return("Attempted to inject context using a `cloud_service`. Expected"" `MyCustomCloudServiceContextLoader` to be explicitly passed to `open_dagster_pipes`"" in the external process.")
And the context loader:
### EXTERNAL PROCESSimport json
from collections.abc import Iterator
from contextlib import contextmanager
import cloud_service
from dagster_pipes import PipesContextData, PipesContextLoader, PipesParams
classMyCustomCloudServiceContextLoader(PipesContextLoader):@contextmanagerdefload_context(self, params: PipesParams)-> Iterator[PipesContextData]:# params were yielded by the above context injector and sourced from the bootstrap payload
key = params["key"]
data = cloud_service.read(key)yield json.loads(data)
The message reader/writer is responsible for handling log files written by the external process as well as messages. However, the APIs for customizing log file handling are still in flux, so they are not covered in this guide. We will update the guide with instructions for customizing log handling as soon as these questions are resolved.
In general if you are implementing a custom variant of one, you will want to implement a matching variant of the other. Furtheremore, message writers internally create a PipesMessageWriterChannel subcomponent for which you will likely also need to implement a custom variant-- see below for details.
Below is a simple example that uses a fictional cloud_service key/value store as a storage layer for message chunks. This example is a little more sophisticated than the context injector/loader example because we are going to inherit from PipesBlobStoreMessageReader and PipesBlobStoreMessageWriter instead of the plain abstract base classes. The blob store reader/writer provide infrastructure for chunking messages. Messages are buffered on the writer and uploaded in chunks at a fixed interval (defaulting to 10 seconds). The reader similarly attempts to download message chunks at a fixed interval (defaulting to 10 seconds). This prevents the need to read/write a cloud service blob store for every message (which could get expensive).
First, the message reader:
### ORCHESTRATION PROCESSimport os
import string
from collections.abc import Iterator
from random import random
from typing import Optional
import cloud_service
from dagster_pipes import PipesParams
from dagster import PipesBlobStoreMessageReader
classMyCustomCloudServiceMessageReader(PipesBlobStoreMessageReader):defget_params(self)-> Iterator[PipesParams]:# generate a random key prefix to write message chunks under on the cloud service
key_prefix ="".join(random.choices(string.ascii_letters, k=30))yield{"key_prefix": key_prefix}defdownload_messages_chunk(self, index:int, params: PipesParams)-> Optional[str]:
message_path = os.path.join(params["path"],f"{index}.json")
raw_message = cloud_service.read(message_path)return raw_message
defno_messages_debug_text(self)->str:return("Attempted to read messages from a `cloud_service`. Expected"" MyCustomCloudServiceMessageWriter to be explicitly passed to `open_dagster_pipes` in"" the external process.")
And the message writer:
### EXTERNAL PROCESSimport json
from typing import IO
import cloud_service
from dagster_pipes import(
PipesBlobStoreMessageWriter,
PipesBlobStoreMessageWriterChannel,
PipesParams,)classMyCustomCloudServiceMessageWriter(PipesBlobStoreMessageWriter):defmake_channel(
self, params: PipesParams
)->"MyCustomCloudServiceMessageWriterChannel":# params were yielded by the above message reader and sourced from the bootstrap payload
key_prefix = params["key_prefix"]return MyCustomCloudServiceMessageWriterChannel(key_prefix=key_prefix)classMyCustomCloudServiceMessageWriterChannel(PipesBlobStoreMessageWriterChannel):def__init__(self, key_prefix:str):super().__init__()
self.key_prefix = key_prefix
# This will be called periodically to upload any buffered messagesdefupload_messages_chunk(self, payload: IO, index:int)->None:
key =f"{self.key_prefix}/{index}.json"
cloud_service.write(key, json.dumps(payload.read()))