Ask AI

You are viewing an unreleased or outdated version of the documentation

GCP (dagster-gcp)

BigQuery

Related Guides:

BigQuery Resource

dagster_gcp.BigQueryResource ResourceDefinition[source]

Config Schema:
project (Union[dagster.StringSource, None], optional):

Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.

Default Value: None

location (Union[dagster.StringSource, None], optional):

Default location for jobs / datasets / tables.

Default Value: None

gcp_credentials (Union[dagster.StringSource, None], optional):

GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64

Default Value: None

Resource for interacting with Google BigQuery.

Examples

from dagster import Definitions, asset
from dagster_gcp import BigQueryResource

@asset
def my_table(bigquery: BigQueryResource):
    with bigquery.get_client() as client:
        client.query("SELECT * FROM my_dataset.my_table")

defs = Definitions(
    assets=[my_table],
    resources={
        "bigquery": BigQueryResource(project="my-project")
    }
)

BigQuery I/O Manager

dagster_gcp.BigQueryIOManager IOManagerDefinition[source]

Config Schema:
project (dagster.StringSource):

The GCP project to use.

dataset (Union[dagster.StringSource, None], optional):

Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.

Default Value: None

location (Union[dagster.StringSource, None], optional):

The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.

Default Value: None

gcp_credentials (Union[dagster.StringSource, None], optional):

GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64

Default Value: None

temporary_gcs_bucket (Union[dagster.StringSource, None], optional):

When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.

Default Value: None

timeout (Union[Float, None], optional):

When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).

Default Value: None

Base class for an I/O manager definition that reads inputs from and writes outputs to BigQuery.

Examples

from dagster_gcp import BigQueryIOManager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions, EnvVar

class MyBigQueryIOManager(BigQueryIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [BigQueryPandasTypeHandler()]

@asset(
    key_prefix=["my_dataset"]  # my_dataset will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
    }
)

You can set a default dataset to store the assets using the dataset configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table],
    resources={
            "io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"), dataset="my_dataset")
        }
)

On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_dataset"]  # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
    ...

@asset(
    # note that the key needs to be "schema"
    metadata={"schema": "my_dataset"}  # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
    ...

For ops, the dataset can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

If none of these is provided, the dataset will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata columns to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
    # my_table will just contain the data from column "a"
    ...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the gcp_credentials configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64

BigQuery Ops

dagster_gcp.bq_create_dataset(context)[source]

BigQuery Create Dataset.

This op encapsulates creating a BigQuery dataset.

Expects a BQ client to be provisioned in resources as context.resources.bigquery.

dagster_gcp.bq_delete_dataset(context)[source]

BigQuery Delete Dataset.

This op encapsulates deleting a BigQuery dataset.

Expects a BQ client to be provisioned in resources as context.resources.bigquery.

dagster_gcp.bq_op_for_queries(sql_queries)[source]

Executes BigQuery SQL queries.

Expects a BQ client to be provisioned in resources as context.resources.bigquery.

dagster_gcp.import_df_to_bq(context, df)[source]
dagster_gcp.import_file_to_bq(context, path)[source]
dagster_gcp.import_gcs_paths_to_bq(context, paths)[source]

Data Freshness

dagster_gcp.fetch_last_updated_timestamps(*, client, dataset_id, table_ids)[source]

Get the last updated timestamps of a list BigQuery table.

Note that this only works on BigQuery tables, and not views.

Parameters:
  • client (bigquery.Client) – The BigQuery client.

  • dataset_id (str) – The BigQuery dataset ID.

  • table_ids (Sequence[str]) – The table IDs to get the last updated timestamp for.

Returns:

A mapping of table IDs to their last updated timestamps (UTC).

Return type:

Mapping[str, datetime]

Other

class dagster_gcp.BigQueryError[source]

GCS

GCS Resource

dagster_gcp.GCSResource ResourceDefinition[source]

Config Schema:
project (Union[dagster.StringSource, None], optional):

Project name

Default Value: None

Resource for interacting with Google Cloud Storage.

Example

@asset
def my_asset(gcs: GCSResource):
    with gcs.get_client() as client:
        # client is a google.cloud.storage.Client
        ...

GCS I/O Manager

dagster_gcp.GCSPickleIOManager IOManagerDefinition[source]

Config Schema:
gcs_bucket (dagster.StringSource):

GCS bucket to store files

gcs_prefix (Union[dagster.StringSource, None], optional):

Prefix to add to all file paths

Default Value: ‘dagster’

Persistent IO manager using GCS for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at <base_dir>/<asset_key>. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of /my/base/path, an asset with key AssetKey(["one", "two", "three"]) would be stored in a file called three in a directory with path /my/base/path/one/two/.

Example usage:

  1. Attach this IO manager to a set of assets.

from dagster import asset, Definitions
from dagster_gcp.gcs import GCSPickleIOManager, GCSResource

@asset
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return asset1[:5]

defs = Definitions(
    assets=[asset1, asset2],
    resources={
        "io_manager": GCSPickleIOManager(
            gcs_bucket="my-cool-bucket",
            gcs_prefix="my-cool-prefix",
            gcs=GCSResource(project="my-cool-project")
        ),

    }
)
  1. Attach this IO manager to your job to make it available to your ops.

from dagster import job
from dagster_gcp.gcs import GCSPickleIOManager, GCSResource

@job(
    resource_defs={
        "io_manager": GCSPickleIOManager(
            gcs=GCSResource(project="my-cool-project")
            gcs_bucket="my-cool-bucket",
            gcs_prefix="my-cool-prefix"
        ),
    }
)
def my_job():
    ...

GCS Sensor

dagster_gcp.gcs.sensor.get_gcs_keys(bucket, prefix=None, since_key=None, gcs_session=None)[source]

Return a list of updated keys in a GCS bucket.

Parameters:
  • bucket (str) – The name of the GCS bucket.

  • prefix (Optional[str]) – The prefix to filter the keys by.

  • since_key (Optional[str]) – The key to start from. If provided, only keys updated after this key will be returned.

  • gcs_session (Optional[google.cloud.storage.client.Client]) – A GCS client session. If not provided, a new session will be created.

Returns:

A list of keys in the bucket, sorted by update time, that are newer than the since_key.

Return type:

List[str]

Example

@resource
def google_cloud_storage_client(context):
    return storage.Client().from_service_account_json("my-service-account.json")

@sensor(job=my_job, required_resource_keys={"google_cloud_storage_client"})
def my_gcs_sensor(context):
    since_key = context.cursor or None
    new_gcs_keys = get_gcs_keys(
        "my-bucket",
        prefix="data",
        since_key=since_key,
        gcs_session=context.resources.google_cloud_storage_client
    )

    if not new_gcs_keys:
        return SkipReason("No new gcs files found for bucket 'my-bucket'.")

    for gcs_key in new_gcs_keys:
        yield RunRequest(run_key=gcs_key, run_config={
            "ops": {
                "gcs_files": {
                    "config": {
                        "gcs_key": gcs_key
                    }
                }
            }
        })

    last_key = new_gcs_keys[-1]
    context.update_cursor(last_key)

File Manager (Experimental)

class dagster_gcp.GCSFileHandle(gcs_bucket, gcs_key)[source]

A reference to a file on GCS.

dagster_gcp.GCSFileManagerResource ResourceDefinition[source]

Config Schema:
project (Union[dagster.StringSource, None], optional):

Project name

Default Value: None

gcs_bucket (dagster.StringSource):

GCS bucket to store files

gcs_prefix (Union[dagster.StringSource, None], optional):

Prefix to add to all file paths

Default Value: ‘dagster’

FileManager that provides abstract access to GCS.

GCS Compute Log Manager

class dagster_gcp.gcs.GCSComputeLogManager(bucket, local_dir=None, inst_data=None, prefix='dagster', json_credentials_envvar=None, upload_interval=None, show_url_only=False)[source]

Logs op compute function stdout and stderr to GCS.

Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml such as the following:

compute_logs:
  module: dagster_gcp.gcs.compute_log_manager
  class: GCSComputeLogManager
  config:
    bucket: "mycorp-dagster-compute-logs"
    local_dir: "/tmp/cool"
    prefix: "dagster-test-"
    upload_interval: 30

There are more configuration examples in the instance documentation guide: https://docs.dagster.io/deployment/dagster-instance#compute-log-storage

Parameters:
  • bucket (str) – The name of the GCS bucket to which to log.

  • local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster._seven.get_system_temp_directory().

  • prefix (Optional[str]) – Prefix for the log file keys.

  • json_credentials_envvar (Optional[str]) – Environment variable that contains the JSON with a private key and other credentials information. If this is set, GOOGLE_APPLICATION_CREDENTIALS will be ignored. Can be used when the private key cannot be used as a file.

  • upload_interval – (Optional[int]): Interval in seconds to upload partial log files to GCS. By default, will only upload when the capture is complete.

  • show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.

  • inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when instantiated from config.

Dataproc

Dataproc Resource

dagster_gcp.DataprocResource ResourceDefinition[source]

Config Schema:
project_id (dagster.StringSource):

Required. Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset/job.

region (dagster.StringSource):

The GCP region.

cluster_name (dagster.StringSource):

Required. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused.

labels (Union[dict, None], optional):

Optional. The labels to associate with this cluster. Label keys must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if present, must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated with a cluster.

Default Value: None

cluster_config_yaml_path (Union[dagster.StringSource, None], optional):

Full path to a YAML file containing cluster configuration. See https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for configuration options. Only one of cluster_config_yaml_path, cluster_config_json_path, or cluster_config_dict may be provided.

Default Value: None

cluster_config_json_path (Union[dagster.StringSource, None], optional):

Full path to a JSON file containing cluster configuration. See https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for configuration options. Only one of cluster_config_yaml_path, cluster_config_json_path, or cluster_config_dict may be provided.

Default Value: None

cluster_config_dict (Union[dict, None], optional):

Python dictionary containing cluster configuration. See https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for configuration options. Only one of cluster_config_yaml_path, cluster_config_json_path, or cluster_config_dict may be provided.

Default Value: None

Resource for connecting to a Dataproc cluster.

Example

@asset
def my_asset(dataproc: DataprocResource):
    with dataproc.get_client() as client:
        # client is a dagster_gcp.DataprocClient
        ...

Dataproc Ops

dagster_gcp.dataproc_op = <dagster._core.definitions.op_definition.OpDefinition object>[source]

Config Schema:
job_timeout_in_seconds (Int, optional):

Optional. Maximum time in seconds to wait for the job being completed. Default is set to 1200 seconds (20 minutes).

Default Value: 1200

job_config (strict dict):
Config Schema:
job (strict dict, optional):

A Cloud Dataproc job resource.

Config Schema:
status (strict dict, optional):

Cloud Dataproc job status.

placement (strict dict, optional):

Cloud Dataproc job config.

Config Schema:
clusterName (String, optional):

Required. The name of the cluster where the job will be submitted.

scheduling (strict dict, optional):

Job scheduling options.

Config Schema:
maxFailuresPerHour (Int, optional):

Optional. Maximum number of times per hour a driver may be restarted as a result of driver terminating with non-zero code before job is reported failed.A job may be reported as thrashing if driver exits with non-zero code 4 times within 10 minute window.Maximum value is 10.

pigJob (strict dict, optional):

A Cloud Dataproc job for running Apache Pig (https://pig.apache.org/) queries on YARN.

Config Schema:
queryFileUri (String, optional):

The HCFS URI of the script that contains the Pig queries.

queryList (strict dict, optional):

A list of queries to run on a cluster.

Config Schema:
queries (List[String], optional):

Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. Here is an example of an Cloud Dataproc API snippet that uses a QueryList to specify a HiveJob: “hiveJob”: { “queryList”: { “queries”: [ “query1”, “query2”, “query3;query4”, ] } }

jarFileUris (List[String], optional):

Optional. HCFS URIs of jar files to add to the CLASSPATH of the Pig Client and Hadoop MapReduce (MR) tasks. Can contain Pig UDFs.

scriptVariables (permissive dict, optional):

Optional. Mapping of query variable names to values (equivalent to the Pig command: name=[value]).

loggingConfig (strict dict, optional):

The runtime logging config of the job.

Config Schema:
driverLogLevels (permissive dict, optional):

The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’

properties (permissive dict, optional):

Optional. A mapping of property names to values, used to configure Pig. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/hadoop/conf/*-site.xml, /etc/pig/conf/pig.properties, and classes in user code.

continueOnFailure (Bool, optional):

Optional. Whether to continue executing queries if a query fails. The default value is false. Setting to true can be useful when executing independent parallel queries.

hiveJob (strict dict, optional):

A Cloud Dataproc job for running Apache Hive (https://hive.apache.org/) queries on YARN.

Config Schema:
continueOnFailure (Bool, optional):

Optional. Whether to continue executing queries if a query fails. The default value is false. Setting to true can be useful when executing independent parallel queries.

queryFileUri (String, optional):

The HCFS URI of the script that contains Hive queries.

queryList (strict dict, optional):

A list of queries to run on a cluster.

Config Schema:
queries (List[String], optional):

Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. Here is an example of an Cloud Dataproc API snippet that uses a QueryList to specify a HiveJob: “hiveJob”: { “queryList”: { “queries”: [ “query1”, “query2”, “query3;query4”, ] } }

jarFileUris (List[String], optional):

Optional. HCFS URIs of jar files to add to the CLASSPATH of the Hive server and Hadoop MapReduce (MR) tasks. Can contain Hive SerDes and UDFs.

scriptVariables (permissive dict, optional):

Optional. Mapping of query variable names to values (equivalent to the Hive command: SET name=”value”;).

properties (permissive dict, optional):

Optional. A mapping of property names and values, used to configure Hive. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/hadoop/conf/*-site.xml, /etc/hive/conf/hive-site.xml, and classes in user code.

labels (permissive dict, optional):

Optional. The labels to associate with this job. Label keys must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if present, must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated with a job.

sparkJob (strict dict, optional):

A Cloud Dataproc job for running Apache Spark (http://spark.apache.org/) applications on YARN.

Config Schema:
archiveUris (List[String], optional):

Optional. HCFS URIs of archives to be extracted in the working directory of Spark drivers and tasks. Supported file types: .jar, .tar, .tar.gz, .tgz, and .zip.

mainJarFileUri (String, optional):

The HCFS URI of the jar file that contains the main class.

jarFileUris (List[String], optional):

Optional. HCFS URIs of jar files to add to the CLASSPATHs of the Spark driver and tasks.

loggingConfig (strict dict, optional):

The runtime logging config of the job.

Config Schema:
driverLogLevels (permissive dict, optional):

The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’

properties (permissive dict, optional):

Optional. A mapping of property names to values, used to configure Spark. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/spark/conf/spark-defaults.conf and classes in user code.

args (List[String], optional):

Optional. The arguments to pass to the driver. Do not include arguments, such as –conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.

fileUris (List[String], optional):

Optional. HCFS URIs of files to be copied to the working directory of Spark drivers and distributed tasks. Useful for naively parallel tasks.

mainClass (String, optional):

The name of the driver’s main class. The jar file that contains the class must be in the default CLASSPATH or specified in jar_file_uris.

sparkSqlJob (strict dict, optional):

A Cloud Dataproc job for running Apache Spark SQL (http://spark.apache.org/sql/) queries.

Config Schema:
queryList (strict dict, optional):

A list of queries to run on a cluster.

Config Schema:
queries (List[String], optional):

Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. Here is an example of an Cloud Dataproc API snippet that uses a QueryList to specify a HiveJob: “hiveJob”: { “queryList”: { “queries”: [ “query1”, “query2”, “query3;query4”, ] } }

queryFileUri (String, optional):

The HCFS URI of the script that contains SQL queries.

scriptVariables (permissive dict, optional):

Optional. Mapping of query variable names to values (equivalent to the Spark SQL command: SET name=”value”;).

jarFileUris (List[String], optional):

Optional. HCFS URIs of jar files to be added to the Spark CLASSPATH.

loggingConfig (strict dict, optional):

The runtime logging config of the job.

Config Schema:
driverLogLevels (permissive dict, optional):

The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’

properties (permissive dict, optional):

Optional. A mapping of property names to values, used to configure Spark SQL’s SparkConf. Properties that conflict with values set by the Cloud Dataproc API may be overwritten.

pysparkJob (strict dict, optional):

A Cloud Dataproc job for running Apache PySpark (https://spark.apache.org/docs/0.9.0/python-programming-guide.html) applications on YARN.

Config Schema:
jarFileUris (List[String], optional):

Optional. HCFS URIs of jar files to add to the CLASSPATHs of the Python driver and tasks.

loggingConfig (strict dict, optional):

The runtime logging config of the job.

Config Schema:
driverLogLevels (permissive dict, optional):

The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’

properties (permissive dict, optional):

Optional. A mapping of property names to values, used to configure PySpark. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/spark/conf/spark-defaults.conf and classes in user code.

args (List[String], optional):

Optional. The arguments to pass to the driver. Do not include arguments, such as –conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.

fileUris (List[String], optional):

Optional. HCFS URIs of files to be copied to the working directory of Python drivers and distributed tasks. Useful for naively parallel tasks.

pythonFileUris (List[String], optional):

Optional. HCFS file URIs of Python files to pass to the PySpark framework. Supported file types: .py, .egg, and .zip.

mainPythonFileUri (String, optional):

Required. The HCFS URI of the main Python file to use as the driver. Must be a .py file.

archiveUris (List[String], optional):

Optional. HCFS URIs of archives to be extracted in the working directory of .jar, .tar, .tar.gz, .tgz, and .zip.

reference (strict dict, optional):

Encapsulates the full scoping used to reference a job.

Config Schema:
projectId (String, optional):

Required. The ID of the Google Cloud Platform project that the job belongs to.

jobId (String, optional):

Optional. The job ID, which must be unique within the project.The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-). The maximum length is 100 characters.If not specified by the caller, the job ID will be provided by the server.

hadoopJob (strict dict, optional):

A Cloud Dataproc job for running Apache Hadoop MapReduce (https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html) jobs on Apache Hadoop YARN (https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-site/YARN.html).

Config Schema:
jarFileUris (List[String], optional):

Optional. Jar file URIs to add to the CLASSPATHs of the Hadoop driver and tasks.

loggingConfig (strict dict, optional):

The runtime logging config of the job.

Config Schema:
driverLogLevels (permissive dict, optional):

The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’

properties (permissive dict, optional):

Optional. A mapping of property names to values, used to configure Hadoop. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/hadoop/conf/*-site and classes in user code.

args (List[String], optional):

Optional. The arguments to pass to the driver. Do not include arguments, such as -libjars or -Dfoo=bar, that can be set as job properties, since a collision may occur that causes an incorrect job submission.

fileUris (List[String], optional):

Optional. HCFS (Hadoop Compatible Filesystem) URIs of files to be copied to the working directory of Hadoop drivers and distributed tasks. Useful for naively parallel tasks.

mainClass (String, optional):

The name of the driver’s main class. The jar file containing the class must be in the default CLASSPATH or specified in jar_file_uris.

archiveUris (List[String], optional):

Optional. HCFS URIs of archives to be extracted in the working directory of Hadoop drivers and tasks. Supported file types: .jar, .tar, .tar.gz, .tgz, or .zip.

mainJarFileUri (String, optional):

The HCFS URI of the jar file containing the main class. Examples: ‘gs://foo-bucket/analytics-binaries/extract-useful-metrics-mr.jar’ ‘hdfs:/tmp/test-samples/custom-wordcount.jar’ ‘file:///home/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar’

projectId (dagster.StringSource):

Required. Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.

region (dagster.StringSource):

job_scoped_cluster (Bool, optional):

whether to create a cluster or use an existing cluster

Default Value: True

Legacy

dagster_gcp.ConfigurablePickledObjectGCSIOManager IOManagerDefinition[source]

Config Schema:
gcs_bucket (dagster.StringSource):

GCS bucket to store files

gcs_prefix (Union[dagster.StringSource, None], optional):

Prefix to add to all file paths

Default Value: ‘dagster’

deprecated This API will be removed in version 2.0.

Please use GCSPickleIOManager instead..

Renamed to GCSPickleIOManager. See GCSPickleIOManager for documentation.

dagster_gcp.bigquery_resource ResourceDefinition[source]

Config Schema:
project (Union[dagster.StringSource, None], optional):

Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.

Default Value: None

location (Union[dagster.StringSource, None], optional):

Default location for jobs / datasets / tables.

Default Value: None

gcp_credentials (Union[dagster.StringSource, None], optional):

GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64

Default Value: None

dagster_gcp.build_bigquery_io_manager IOManagerDefinition[source]

Config Schema:
project (dagster.StringSource):

The GCP project to use.

dataset (Union[dagster.StringSource, None], optional):

Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.

Default Value: None

location (Union[dagster.StringSource, None], optional):

The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.

Default Value: None

gcp_credentials (Union[dagster.StringSource, None], optional):

GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64

Default Value: None

temporary_gcs_bucket (Union[dagster.StringSource, None], optional):

When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.

Default Value: None

timeout (Union[Float, None], optional):

When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).

Default Value: None

experimental This API may break in future versions, even between dot releases.

Builds an I/O manager definition that reads inputs from and writes outputs to BigQuery.

Parameters:
  • type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between slices of BigQuery tables and an in-memory type - e.g. a Pandas DataFrame. If only one DbTypeHandler is provided, it will be used as the default_load_type.

  • default_load_type (Type) – When an input has no type annotation, load it as this type.

Returns:

IOManagerDefinition

Examples

from dagster_gcp import build_bigquery_io_manager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions

@asset(
    key_prefix=["my_prefix"],
    metadata={"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

@asset(
    key_prefix=["my_dataset"]  # my_dataset will be used as the dataset in BigQuery
)
def my_second_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

bigquery_io_manager = build_bigquery_io_manager([BigQueryPandasTypeHandler()])

defs = Definitions(
    assets=[my_table, my_second_table],
    resources={
        "io_manager": bigquery_io_manager.configured({
            "project" : {"env": "GCP_PROJECT"}
        })
    }
)

You can set a default dataset to store the assets using the dataset configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table],
    resources={
            "io_manager": bigquery_io_manager.configured({
                "project" : {"env": "GCP_PROJECT"}
                "dataset": "my_dataset"
            })
        }
)

On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_dataset"]  # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
    ...

@asset(
    # note that the key needs to be "schema"
    metadata={"schema": "my_dataset"}  # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
    ...

For ops, the dataset can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

If none of these is provided, the dataset will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata columns to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
    # my_table will just contain the data from column "a"
    ...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the gcp_credentials configuration. Dagster willstore this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64

dagster_gcp.gcs_resource ResourceDefinition[source]

Config Schema:
project (Union[dagster.StringSource, None], optional):

Project name

Default Value: None

dagster_gcp.gcs_pickle_io_manager IOManagerDefinition[source]

Config Schema:
gcs_bucket (dagster.StringSource):

GCS bucket to store files

gcs_prefix (Union[dagster.StringSource, None], optional):

Prefix to add to all file paths

Default Value: ‘dagster’

Persistent IO manager using GCS for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.

Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at <base_dir>/<asset_key>. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.

Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of /my/base/path, an asset with key AssetKey(["one", "two", "three"]) would be stored in a file called three in a directory with path /my/base/path/one/two/.

Example usage:

  1. Attach this IO manager to a set of assets.

from dagster import Definitions, asset
from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource

@asset
def asset1():
    # create df ...
    return df

@asset
def asset2(asset1):
    return asset1[:5]

defs = Definitions(
    assets=[asset1, asset2],
    resources={
            "io_manager": gcs_pickle_io_manager.configured(
                {"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
            ),
            "gcs": gcs_resource.configured({"project": "my-cool-project"}),
        },
)
  1. Attach this IO manager to your job to make it available to your ops.

from dagster import job
from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource

@job(
    resource_defs={
        "io_manager": gcs_pickle_io_manager.configured(
            {"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
        ),
        "gcs": gcs_resource.configured({"project": "my-cool-project"}),
    },
)
def my_job():
    ...
dagster_gcp.gcs_file_manager ResourceDefinition[source]

FileManager that provides abstract access to GCS.

Implements the FileManager API.

dagster_gcp.dataproc_resource ResourceDefinition[source]

Config Schema:
projectId (dagster.StringSource):

Required. Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.

region (dagster.StringSource):

clusterName (dagster.StringSource):

Required. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused.

cluster_config (strict dict, optional):

The cluster config.

Config Schema:
masterConfig (strict dict, optional):

Optional. The config settings for Compute Engine resources in an instance group, such as a master or worker group.

Config Schema:
accelerators (List[strict dict], optional):

Optional. The Compute Engine accelerator configuration for these instances.Beta Feature: This feature is still under development. It may be changed before final release.

numInstances (Int, optional):

Optional. The number of VM instances in the instance group. For master instance groups, must be set to 1.

diskConfig (strict dict, optional):

Specifies the config of disk options for a group of VM instances.

Config Schema:
numLocalSsds (Int, optional):

Optional. Number of attached SSDs, from 0 to 4 (default is 0). If SSDs are not attached, the boot disk is used to store runtime logs and HDFS (https://hadoop.apache.org/docs/r1.2.1/hdfs_user_guide.html) data. If one or more SSDs are attached, this runtime bulk data is spread across them, and the boot disk contains only basic config and installed binaries.

bootDiskSizeGb (Int, optional):

Optional. Size in GB of the boot disk (default is 500GB).

bootDiskType (String, optional):

Optional. Type of the boot disk (default is “pd-standard”). Valid values: “pd-ssd” (Persistent Disk Solid State Drive) or “pd-standard” (Persistent Disk Hard Disk Drive).

managedGroupConfig (strict dict, optional):

Specifies the resources used to actively manage an instance group.

isPreemptible (Bool, optional):

Optional. Specifies that this instance group contains preemptible instances.

imageUri (String, optional):

Optional. The Compute Engine image resource used for cluster instances. It can be specified or may be inferred from SoftwareConfig.image_version.

machineTypeUri (String, optional):

Optional. The Compute Engine machine type used for cluster instances.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 n1-standard-2Auto Zone Exception: If you are using the Cloud Dataproc Auto Zone Placement feature, you must use the short name of the machine type resource, for example, n1-standard-2.

secondaryWorkerConfig (strict dict, optional):

Optional. The config settings for Compute Engine resources in an instance group, such as a master or worker group.

Config Schema:
accelerators (List[strict dict], optional):

Optional. The Compute Engine accelerator configuration for these instances.Beta Feature: This feature is still under development. It may be changed before final release.

numInstances (Int, optional):

Optional. The number of VM instances in the instance group. For master instance groups, must be set to 1.

diskConfig (strict dict, optional):

Specifies the config of disk options for a group of VM instances.

Config Schema:
numLocalSsds (Int, optional):

Optional. Number of attached SSDs, from 0 to 4 (default is 0). If SSDs are not attached, the boot disk is used to store runtime logs and HDFS (https://hadoop.apache.org/docs/r1.2.1/hdfs_user_guide.html) data. If one or more SSDs are attached, this runtime bulk data is spread across them, and the boot disk contains only basic config and installed binaries.

bootDiskSizeGb (Int, optional):

Optional. Size in GB of the boot disk (default is 500GB).

bootDiskType (String, optional):

Optional. Type of the boot disk (default is “pd-standard”). Valid values: “pd-ssd” (Persistent Disk Solid State Drive) or “pd-standard” (Persistent Disk Hard Disk Drive).

managedGroupConfig (strict dict, optional):

Specifies the resources used to actively manage an instance group.

isPreemptible (Bool, optional):

Optional. Specifies that this instance group contains preemptible instances.

imageUri (String, optional):

Optional. The Compute Engine image resource used for cluster instances. It can be specified or may be inferred from SoftwareConfig.image_version.

machineTypeUri (String, optional):

Optional. The Compute Engine machine type used for cluster instances.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 n1-standard-2Auto Zone Exception: If you are using the Cloud Dataproc Auto Zone Placement feature, you must use the short name of the machine type resource, for example, n1-standard-2.

encryptionConfig (strict dict, optional):

Encryption settings for the cluster.

Config Schema:
gcePdKmsKeyName (String, optional):

Optional. The Cloud KMS key name to use for PD disk encryption for all instances in the cluster.

securityConfig (strict dict, optional):

Security related configuration, including Kerberos.

Config Schema:
kerberosConfig (strict dict, optional):

Specifies Kerberos related configuration.

Config Schema:
truststorePasswordUri (String, optional):

Optional. The Cloud Storage URI of a KMS encrypted file containing the password to the user provided truststore. For the self-signed certificate, this password is generated by Dataproc.

enableKerberos (Bool, optional):

Optional. Flag to indicate whether to Kerberize the cluster.

truststoreUri (String, optional):

Optional. The Cloud Storage URI of the truststore file used for SSL encryption. If not provided, Dataproc will provide a self-signed certificate.

crossRealmTrustRealm (String, optional):

Optional. The remote realm the Dataproc on-cluster KDC will trust, should the user enable cross realm trust.

rootPrincipalPasswordUri (String, optional):

Required. The Cloud Storage URI of a KMS encrypted file containing the root principal password.

kmsKeyUri (String, optional):

Required. The uri of the KMS key used to encrypt various sensitive files.

crossRealmTrustKdc (String, optional):

Optional. The KDC (IP or hostname) for the remote trusted realm in a cross realm trust relationship.

crossRealmTrustSharedPasswordUri (String, optional):

Optional. The Cloud Storage URI of a KMS encrypted file containing the shared password between the on-cluster Kerberos realm and the remote trusted realm, in a cross realm trust relationship.

tgtLifetimeHours (Int, optional):

Optional. The lifetime of the ticket granting ticket, in hours. If not specified, or user specifies 0, then default value 10 will be used.

keystoreUri (String, optional):

Optional. The Cloud Storage URI of the keystore file used for SSL encryption. If not provided, Dataproc will provide a self-signed certificate.

keyPasswordUri (String, optional):

Optional. The Cloud Storage URI of a KMS encrypted file containing the password to the user provided key. For the self-signed certificate, this password is generated by Dataproc.

keystorePasswordUri (String, optional):

Optional. The Cloud Storage URI of a KMS encrypted file containing the password to the user provided keystore. For the self-signed certificate, this password is generated by Dataproc.

crossRealmTrustAdminServer (String, optional):

Optional. The admin server (IP or hostname) for the remote trusted realm in a cross realm trust relationship.

kdcDbKeyUri (String, optional):

Optional. The Cloud Storage URI of a KMS encrypted file containing the master key of the KDC database.

initializationActions (List[strict dict], optional):

Optional. Commands to execute on each node after config is completed. By default, executables are run on master and all worker nodes. You can test a node’s role metadata to run an executable on a master or worker node, as shown below using curl (you can also use wget): ROLE=$(curl -H Metadata-Flavor:Google http://metadata/computeMetadata/v1/instance/attributes/dataproc-role) if [[ “${ROLE}” == ‘Master’ ]]; then … master specific actions … else … worker specific actions … fi

configBucket (String, optional):

Optional. A Google Cloud Storage bucket used to stage job dependencies, config files, and job driver console output. If you do not specify a staging bucket, Cloud Dataproc will determine a Cloud Storage location (US, ASIA, or EU) for your cluster’s staging bucket according to the Google Compute Engine zone where your cluster is deployed, and then create and manage this project-level, per-location bucket (see Cloud Dataproc staging bucket).

workerConfig (strict dict, optional):

Optional. The config settings for Compute Engine resources in an instance group, such as a master or worker group.

Config Schema:
accelerators (List[strict dict], optional):

Optional. The Compute Engine accelerator configuration for these instances.Beta Feature: This feature is still under development. It may be changed before final release.

numInstances (Int, optional):

Optional. The number of VM instances in the instance group. For master instance groups, must be set to 1.

diskConfig (strict dict, optional):

Specifies the config of disk options for a group of VM instances.

Config Schema:
numLocalSsds (Int, optional):

Optional. Number of attached SSDs, from 0 to 4 (default is 0). If SSDs are not attached, the boot disk is used to store runtime logs and HDFS (https://hadoop.apache.org/docs/r1.2.1/hdfs_user_guide.html) data. If one or more SSDs are attached, this runtime bulk data is spread across them, and the boot disk contains only basic config and installed binaries.

bootDiskSizeGb (Int, optional):

Optional. Size in GB of the boot disk (default is 500GB).

bootDiskType (String, optional):

Optional. Type of the boot disk (default is “pd-standard”). Valid values: “pd-ssd” (Persistent Disk Solid State Drive) or “pd-standard” (Persistent Disk Hard Disk Drive).

managedGroupConfig (strict dict, optional):

Specifies the resources used to actively manage an instance group.

isPreemptible (Bool, optional):

Optional. Specifies that this instance group contains preemptible instances.

imageUri (String, optional):

Optional. The Compute Engine image resource used for cluster instances. It can be specified or may be inferred from SoftwareConfig.image_version.

machineTypeUri (String, optional):

Optional. The Compute Engine machine type used for cluster instances.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 n1-standard-2Auto Zone Exception: If you are using the Cloud Dataproc Auto Zone Placement feature, you must use the short name of the machine type resource, for example, n1-standard-2.

gceClusterConfig (strict dict, optional):

Common config settings for resources of Compute Engine cluster instances, applicable to all instances in the cluster.

Config Schema:
networkUri (String, optional):

Optional. The Compute Engine network to be used for machine communications. Cannot be specified with subnetwork_uri. If neither network_uri nor subnetwork_uri is specified, the “default” network of the project is used, if it exists. Cannot be a “Custom Subnet Network” (see Using Subnetworks for more information).A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/regions/global/default projects/[project_id]/regions/global/default default

zoneUri (String, optional):

Optional. The zone where the Compute Engine cluster will be located. On a create request, it is required in the “global” region. If omitted in a non-global Cloud Dataproc region, the service will pick a zone in the corresponding Compute Engine region. On a get request, zone will always be present.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/[zone] projects/[project_id]/zones/[zone] us-central1-f

metadata (permissive dict, optional):

The Compute Engine metadata entries to add to all instances (see Project and instance metadata (https://cloud.google.com/compute/docs/storing-retrieving-metadata#project_and_instance_metadata)).

internalIpOnly (Bool, optional):

Optional. If true, all instances in the cluster will only have internal IP addresses. By default, clusters are not restricted to internal IP addresses, and will have ephemeral external IP addresses assigned to each instance. This internal_ip_only restriction can only be enabled for subnetwork enabled networks, and all off-cluster dependencies must be configured to be accessible without external IP addresses.

serviceAccountScopes (List[String], optional):

Optional. The URIs of service account scopes to be included in Compute Engine instances. The following base set of scopes is always included: https://www.googleapis.com/auth/cloud.useraccounts.readonly https://www.googleapis.com/auth/devstorage.read_write https://www.googleapis.com/auth/logging.writeIf no scopes are specified, the following defaults are also provided: https://www.googleapis.com/auth/bigquery https://www.googleapis.com/auth/bigtable.admin.table https://www.googleapis.com/auth/bigtable.data https://www.googleapis.com/auth/devstorage.full_control

tags (List[String], optional):

The Compute Engine tags to add to all instances (see Tagging instances).

serviceAccount (String, optional):

Optional. The service account of the instances. Defaults to the default Compute Engine service account. Custom service accounts need permissions equivalent to the following IAM roles: roles/logging.logWriter roles/storage.objectAdmin(see https://cloud.google.com/compute/docs/access/service-accounts#custom_service_accounts for more information). Example: [account_id]@[project_id].iam.gserviceaccount.com

subnetworkUri (String, optional):

Optional. The Compute Engine subnetwork to be used for machine communications. Cannot be specified with network_uri.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/regions/us-east1/subnetworks/sub0 projects/[project_id]/regions/us-east1/subnetworks/sub0 sub0

softwareConfig (strict dict, optional):

Specifies the selection and config of software inside the cluster.

Config Schema:
properties (permissive dict, optional):

Optional. The properties to set on daemon config files.Property keys are specified in prefix:property format, for example core:hadoop.tmp.dir. The following are supported prefixes and their mappings: capacity-scheduler: capacity-scheduler.xml core: core-site.xml distcp: distcp-default.xml hdfs: hdfs-site.xml hive: hive-site.xml mapred: mapred-site.xml pig: pig.properties spark: spark-defaults.conf yarn: yarn-site.xmlFor more information, see Cluster properties.

optionalComponents (List[Component], optional):

The set of optional components to activate on the cluster.

imageVersion (String, optional):

Optional. The version of software inside the cluster. It must be one of the supported Cloud Dataproc Versions, such as “1.2” (including a subminor version, such as “1.2.29”), or the “preview” version. If unspecified, it defaults to the latest Debian version.

labels (permissive dict, optional):

Optional. The labels to associate with this cluster. Label keys must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if present, must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated with a cluster.