This library provides a Dagster integration with Sling.
For more information on getting started, see the Sling & Dagster documentation.
Create a definition for how to materialize a set of Sling replication streams as Dagster assets, as described by a Sling replication config. This will create on Asset for every Sling target stream.
A Sling Replication config is a configuration that maps sources to destinations. For the full spec and descriptions, see Sling’s Documentation.
replication_config (Union[Mapping[str, Any], str, Path]) – A path to a Sling replication config, or a dictionary of a replication config.
dagster_sling_translator – (DagsterSlingTranslator): Allows customization of how to map a Sling stream to a Dagster AssetKey.
(Optional[str] (name) – The name of the op.
partitions_def (Optional[PartitionsDefinition]) – The partitions definition for this asset.
backfill_policy (Optional[BackfillPolicy]) – The backfill policy for this asset.
op_tags (Optional[Mapping[str, Any]]) – The tags for the underlying op.
Examples
Running a sync by providing a path to a Sling Replication config:
from dagster_sling import sling_assets, SlingResource, SlingConnectionResource
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_URL")
),
SlingConnectionResource(
name="MY_DUCKDB",
type="duckdb",
connection_string="duckdb:///var/tmp/duckdb.db",
),
]
)
config_path = "/path/to/replication.yaml"
@sling_assets(replication_config=config_path)
def my_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
A function that takes a stream definition from a Sling replication config and returns a Dagster AssetKey.
The stream definition is a dictionary key/value pair where the key is the stream name and the value is a dictionary representing the Sling Replication Stream Config.
For example:
stream_definition = {"public.users":
{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}
By default, this returns the class’s target_prefix paramater concatenated with the stream name. A stream named “public.accounts” will create an AssetKey named “target_public_accounts”.
Override this function to customize how to map a Sling stream to a Dagster AssetKey.
Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:
public.users:
meta:
dagster:
asset_key: "mydb_users"
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition
The Dagster AssetKey for the replication stream.
Examples
Using a custom mapping for streams:
class CustomSlingTranslator(DagsterSlingTranslator):
def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
Defines the auto-materialize policy for a given stream definition.
This method checks the provided stream definition for a specific configuration indicating an auto-materialize policy. If the configuration is found, it returns an eager auto-materialize policy. Otherwise, it returns None.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
An eager auto-materialize policy if the configuration is found, otherwise None.
Optional[AutoMaterializePolicy]
A function that takes a stream name from a Sling replication config and returns a Dagster AssetKey for the dependencies of the replication stream.
By default, this returns the stream name. For example, a stream named “public.accounts” will create an AssetKey named “target_public_accounts” and a dependency named “public_accounts”.
Override this function to customize how to map a Sling stream to a Dagster depenency. Alternatively, you can provide metadata in your Sling replication config to specify the Dagster AssetKey for a stream as follows:
public.users:
meta:
dagster:
deps: "sourcedb_users"
stream_name (str) – The name of the stream.
The Dagster AssetKey dependency for the replication stream.
Examples
Using a custom mapping for streams:
class CustomSlingTranslator(DagsterSlingTranslator):
def get_deps_asset_key(self, stream_name: str) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
Retrieves the description for a given stream definition.
This method checks the provided stream definition for a description. It first looks for an “sql” key in the configuration and returns its value if found. If not, it looks for a description in the metadata under the “dagster” key.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
The description of the stream if found, otherwise None.
Optional[str]
Retrieves the freshness policy for a given stream definition.
This method checks the provided stream definition for a specific configuration indicating a freshness policy. If the configuration is found, it constructs and returns a FreshnessPolicy object based on the provided parameters. Otherwise, it returns None.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
A FreshnessPolicy object if the configuration is found, otherwise None.
Optional[FreshnessPolicy]
Retrieves the group name for a given stream definition.
This method checks the provided stream definition for a group name in the metadata under the “dagster” key.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
The group name if found, otherwise None.
Optional[str]
Retrieves the kinds for a given stream definition.
This method returns “sling” by default. This method can be overridden to provide custom kinds.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
A set containing kinds for the stream’s assets.
Set[str]
Retrieves the metadata for a given stream definition.
This method extracts the configuration from the provided stream definition and returns it as a JSON metadata value.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
A dictionary containing the stream configuration as JSON metadata.
Mapping[str, Any]
Retrieves the tags for a given stream definition.
This method returns an empty dictionary, indicating that no tags are associated with the stream definition by default. This method can be overridden to provide custom tags.
stream_definition (Mapping[str, Any]) – A dictionary representing the stream definition,
details. (which includes configuration)
An empty dictionary.
Mapping[str, Any]
A function that takes a stream name from a Sling replication config and returns a sanitized name for the stream.
By default, this removes any non-alphanumeric characters from the stream name and replaces them with underscores, while removing any double quotes.
stream_name (str) – The name of the stream.
Examples
Using a custom stream name sanitizer:
class CustomSlingTranslator(DagsterSlingTranslator):
def sanitize_stream_name(self, stream_name: str) -> str:
return stream_name.replace(".", "")
Resource for interacting with the Sling package. This resource can be used to run Sling replications.
connections (List[SlingConnectionResource]) – A list of connections to use for the replication.
Examples
from dagster_etl.sling import SlingResource, SlingConnectionResource
sling_resource = SlingResource(
connections=[
SlingConnectionResource(
name="MY_POSTGRES",
type="postgres",
connection_string=EnvVar("POSTGRES_CONNECTION_STRING"),
),
SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host=EnvVar("SNOWFLAKE_HOST"),
user=EnvVar("SNOWFLAKE_USER"),
database=EnvVar("SNOWFLAKE_DATABASE"),
password=EnvVar("SNOWFLAKE_PASSWORD"),
role=EnvVar("SNOWFLAKE_ROLE"),
),
]
)
A representation of a connection to a database or file to be used by Sling. This resource can be used as a source or a target for a Sling syncs.
Reference the Sling docs for more information on possible connection types and parameters: https://docs.slingdata.io/connections
The name of the connection is passed to Sling and must match the name of the connection provided in the replication configuration: https://docs.slingdata.io/sling-cli/run/configuration/replication You may provide either a connection string or keyword arguments for the connection.
Examples
Creating a Sling Connection for a file, such as CSV or JSON:
source = SlingConnectionResource(name="MY_FILE", type="file")
Create a Sling Connection for a Postgres database, using a connection string:
postgres_conn = SlingConnectionResource(name="MY_POSTGRES", type="postgres", connection_string=EnvVar("POSTGRES_CONNECTION_STRING"))
mysql_conn = SlingConnectionResource(name="MY_MYSQL", type="mysql", connection_string="mysql://user:password@host:port/schema")
Create a Sling Connection for a Postgres or Snowflake database, using keyword arguments: