Pythonic resources#

This feature is considered experimental.

This guide acts as an introduction to Dagster resources utilizing the new Pythonic resources API layer, which makes defining and using Dagster resources easier.

Resources are objects that are shared across the implementations of multiple software-defined assets and ops and that can be plugged in after defining those ops and assets.

Resources typically model external components that assets and ops interact with. For example, a resource might be a connection to a data warehouse like Snowflake or a service like Slack.

So, why use resources?

  • Plug in different implementations in different environments - If you have a heavy external dependency that you want to use in production, but avoid using in testing, you can accomplish this by providing different resources in each environment. Check out Separating Business Logic from Environments for more info about this capability.
  • Surface configuration in the UI - Resources and their configuration are surfaced in the Dagster UI, making it easy to see where your resources are used and how they are configured.
  • Share configuration across multiple ops or assets - Resources are configurable and shared, so you can supply configuration in one place instead of configuring the ops and assets individually.
  • Share implementations across multiple ops or assets - When multiple ops access the same external services, resources provide a standard way to structure your code to share the implementations.

Using Pythonic resources#

Getting started#

Typically, resources are defined by subclassing ConfigurableResource. Attributes on the class are used to define the resource's configuration schema. The configuration system has a few advantages over plain Python parameter passing; configured values are displayed in the Dagster UI and can be set dynamically using environment variables.

Assets and ops specify resource dependencies by annotating the resource as a parameter to the asset or op function.

To provide resource values to your assets and ops, attach them to your Definitions call. These resources are automatically passed to the function at runtime.

Using software-defined assets#

Here, we define a subclass of ConfigurableResource representing a connection to an external service. We can configure the resource by constructing it in the Definitions call.

We can define methods on the resource class which depend on config values. These methods can be used by assets and ops.

from dagster import asset, Definitions
from dagster._config.structured_config import ConfigurableResource
import requests
from requests import Response

class MyConnectionResource(ConfigurableResource):
    username: str

    def request(self, endpoint: str) -> Response:
        return requests.get(
            f"https://my-api.com/{endpoint}",
            headers={"user-agent": "dagster"},
        )

@asset
def data_from_service(my_conn: MyConnectionResource) -> Dict[str, Any]:
    return my_conn.request("/fetch_data").json()

defs = Definitions(
    assets=[data_from_service],
    resources={
        "my_conn": MyConnectionResource(username="my_user"),
    },
)

Using environment variables with resources#

Resources can be configured using environment variables, which is useful for secrets or other environment-specific configuration. If you're using Dagster Cloud, environment variables can be configured directly in the UI.

To use environment variables, pass an EnvVar when constructing your resource.

from dagster._config.field_utils import EnvVar
from dagster import Definitions
from dagster._config.structured_config import ConfigurableResource

class CredentialsResource(ConfigurableResource):
    username: str
    password: str

defs = Definitions(
    assets=...,
    resources={
        "credentials": CredentialsResource(
            username=EnvVar("MY_USERNAME"),
            password=EnvVar("MY_PASSWORD"),
        )
    },
)

Configuring resources at runtime#

In some cases, you may want to specify configuration for a resource at runtime, in the launchpad or in a RunRequest for a schedule or sensor. For example, you may want a sensor-triggered run to specify a different target table in a database resource for each run.

You can use the configure_at_launch() method to defer the construction of a configurable resource until runtime.

from dagster._config.structured_config import ConfigurableResource
from dagster import Definitions, asset

class DatabaseResource(ConfigurableResource):
    table: str

    def read(self):
        ...

@asset
def data_from_database(db_conn: DatabaseResource):
    return db_conn.read()

defs = Definitions(
    assets=[data_from_database],
    resources={"db_conn": DatabaseResource.configure_at_launch()},
)

Configuring a resource at runtime in Python code#

Then, configuration for the resource can be provided at runtime in the launchpad or in Python code using the config parameter of the RunRequest:

from dagster import sensor, define_asset_job, RunRequest
from dagster._core.definitions.run_config import RunConfig

update_data_job = define_asset_job(
    name="update_data_job", selection=[data_from_database]
)

@sensor(job=update_data_job)
def table_update_sensor():
    tables = ...
    for table_name in tables:
        yield RunRequest(
            run_config=RunConfig(
                resources={
                    "db_conn": DatabaseResource(table=table_name),
                },
            ),
        )

Resources which depend on other resources#

In some situations, you may want to define a resource which depends on other resources. This is useful for common configuration. For example, separate resources for a database and for a filestore may both depend on credentials for a particular cloud provider. Defining these credentials as a separate, nested resource allows you to specify configuration in a single place. It also makes it easier to test your resources, since you can mock the nested resource.

In this case, you can list that nested resource as an attribute of your resource class.

from dagster import Definitions
from dagster._config.structured_config import ConfigurableResource

class CredentialsResource(ConfigurableResource):
    username: str
    password: str

class FileStoreBucket(ConfigurableResource):
    credentials: CredentialsResource
    region: str

    def write(self, data: str):
        get_filestore_client(
            username=self.credentials.username,
            password=self.credentials.password,
            region=self.region,
        ).write(data)

credentials = CredentialsResource(username="my_user", password="my_password")
defs = Definitions(
    assets=...,
    resources={
        "bucket": FileStoreBucket(
            credentials=credentials,
            region="us-east-1",
        ),
    },
)

If we instead would like the configuration for our credentials to be provided at runtime, we can use the configure_at_launch() method to defer the construction of the CredentialsResource until runtime.

Because credentials requires runtime configuration through the launchpad, it must also be passed to the Definitions object, so that configuration can be provided at runtime. Nested resources only need to be passed to the Definitions object if they require runtime configuration.

credentials = CredentialsResource.configure_at_launch()

defs = Definitions(
    assets=...,
    resources={
        "credentials": credentials,
        "bucket": FileStoreBucket(
            credentials=credentials,
            region="us-east-1",
        ),
    },
)

Testing configurable resources#

You can test the initialization of a ConfigurableResource by constructing it manually. In most cases, you can construct your resource directly:

from dagster._config.structured_config import ConfigurableResource

class MyResource(ConfigurableResource):
    value: str

    def get_value(self) -> str:
        return self.value

def test_my_resource():
    assert MyResource(value="foo").get_value() == "foo"

If your resource requires other resources, then you can pass them as constructor arguments.

from dagster._config.structured_config import ConfigurableResource

class StringHolderResource(ConfigurableResource):
    value: str

class MyResourceRequiresAnother(ConfigurableResource):
    foo: StringHolderResource
    bar: str

def test_my_resource_with_context():
    string_holder = StringHolderResource(value="foo")
    resource = MyResourceRequiresAnother(foo=string_holder, bar="bar")
    assert resource.foo.value == "foo"
    assert resource.bar == "bar"

Defining Pythonic I/O managers#

Pythonic I/O managers are defined as subclasses of ConfigurableIOManager, and similarly to Pythonic resources specify any configuration fields as attributes. Each subclass must implement a handle_output and load_input method, which are called Dagster at runtime to handle the storing and loading of data.

from dagster import Definitions, AssetKey, OutputContext, InputContext
from dagster._config.structured_config import ConfigurableIOManager

class MyIOManager(ConfigurableIOManager):
    root_path: str

    def _get_path(self, asset_key: AssetKey) -> str:
        return self.root_path + "/".join(asset_key.path)

    def handle_output(self, context: OutputContext, obj):
        write_csv(self._get_path(context.asset_key), obj)

    def load_input(self, context: InputContext):
        return read_csv(self._get_path(context.asset_key))

defs = Definitions(
    assets=...,
    resources={"io_manager": MyIOManager(root_path="/tmp/")},
)

#


Advanced resource patterns#

The following sections describe more advanced patterns for using Pythonic resources.

Adapting function-style resources#

In codebases that utilize function-style resources with the @resource decorator, you can use the ConfigurableResourceAdapter class to adapt these resources to the new Pythonic resource pattern.

The adapter subclass should list all config fields as attributes and implement a wrapped_resource property which returns the underlying resource definition to adapt.

from dagster import resource, Definitions, ResourceDefinition, asset
from dagster._config.structured_config import ConfigurableLegacyResourceAdapter

# Old code, interface cannot be changed for back-compat purposes
class Writer:
    def __init__(self, prefix: str):
        self._prefix = prefix

    def output(self, text: str) -> None:
        print(self._prefix + text)

@resource(config_schema={"prefix": str})
def writer_resource(context):
    prefix = context.resource_config["prefix"]
    return Writer(prefix)

# New adapter layer
class WriterResource(ConfigurableLegacyResourceAdapter):
    prefix: str

    @property
    def wrapped_resource(self) -> ResourceDefinition:
        return writer_resource

@asset
def my_asset(writer: Writer):
    writer.output("hello, world!")

defs = Definitions(
    assets=[my_asset], resources={"writer": WriterResource(prefix="greeting: ")}
)

Adapting function-style I/O managers#

Similar to with resources, in codebases that utilize function-style I/O managers with the @io_manager decorator, you can use the ConfigurableIOManagerAdapter class to adapt them to the new Pythonic I/O manager pattern.

The adapter subclass should list all config fields as attributes and implement a wrapped_io_manager property which returns the underlying I/O manager definition to adapt.

from dagster import (
    Definitions,
    IOManagerDefinition,
    io_manager,
    IOManager,
    InputContext,
    OutputContext,
)
from dagster._config.structured_config import ConfigurableLegacyIOManagerAdapter
import os

# Old code, interface cannot be changed for back-compat purposes
class OldFileIOManager(IOManager):
    def __init__(self, base_path: str):
        self.base_path = base_path

    def handle_output(self, context: OutputContext, obj):
        with open(
            os.path.join(self.base_path, context.step_key, context.name), "w"
        ) as fd:
            fd.write(obj)

    def load_input(self, context: InputContext):
        with open(
            os.path.join(
                self.base_path,
                context.upstream_output.step_key,
                context.upstream_output.name,
            ),
            "r",
        ) as fd:
            return fd.read()

@io_manager(config_schema={"base_path": str})
def old_file_io_manager(context):
    base_path = context.resource_config["base_path"]
    return OldFileIOManager(base_path)

# New adapter layer
class MyIOManager(ConfigurableLegacyIOManagerAdapter):
    base_path: str

    @property
    def wrapped_io_manager(self) -> IOManagerDefinition:
        return old_file_io_manager

defs = Definitions(
    assets=...,
    resources={
        "io_manager": MyIOManager(base_path="/tmp/"),
    },
)

Using bare Python objects as resources#

In some cases, you may want to use a bare Python object as a resource which is not a subclass of ConfigurableResource.

For example, you may want to directly pass a third-party API client into your assets or ops. This follows a similar pattern to using a ConfigurableResource subclass, however assets and ops which use these resources must annotate them with Resource.

from dagster import Definitions, asset
from dagster._core.definitions.resource_output import Resource

# `Resource[GitHub]` is treated exactly like `GitHub` for type checking purposes,
# and the runtime type of the github parameter is `GitHub`. The purpose of the
# `Resource` wrapper is to let Dagster know that `github` is a resource and not an
# upstream asset.

@asset
def public_github_repos(github: Resource[GitHub]):
    return github.organization("dagster-io").repositories()

defs = Definitions(
    assets=[public_github_repos],
    resources={"github": GitHub(...)},
)

If you would like a Pythonic resource to depend on a bare Python object, you can use the ResourceDependency annotation to annotate the attribute in question. This indicates to Dagster that the attribute should be treated as a resource dependency.

from dagster._config.structured_config import (
    ConfigurableResource,
    ResourceDependency,
)
from dagster import Definitions

class DBResource(ConfigurableResource):
    engine: ResourceDependency[Engine]

    def query(self, query: str):
        with self.engine.connect() as conn:
            return conn.execute(query)

engine = create_engine(...)
defs = Definitions(
    assets=...,
    resources={"db": DBResource(engine=engine)},
)

Implementation details#

Resource input resolution#

Resources defined by subclassing ConfigurableResource can take various unresolved inputs, including EnvVars instead of string values or unconfigured resources returned by configure_at_launch() in place of configured resources.

Between construction time and runtime, the ConfigurableResource class resolves all of its attributes to their final values, meaning that when an attribute of a resource is accessed in the body of an asset or op, it will always be a fully resolved value.

In the following example, the FileStoreBucket constructor will accept the still-unconfigured unconfigured_credentials_resource, which will be resolved once the resource is configured at runtime.

from dagster._config.structured_config import ConfigurableResource

class CredentialsResource(ConfigurableResource):
    username: str
    password: str

class FileStoreBucket(ConfigurableResource):
    credentials: CredentialsResource
    region: str

    def write(self, data: str):
        # In this context, `self.credentials` is ensured to
        # be a CredentialsResource with valid values for
        # `username` and `password`

        get_filestore_client(
            username=self.credentials.username,
            password=self.credentials.password,
            region=self.region,
        ).write(data)

# unconfigured_credentials_resource is typed as PartialResource[CredentialsResource]
unconfigured_credentials_resource = CredentialsResource.configure_at_launch()

# FileStoreBucket constructor accepts either a CredentialsResource or a
# PartialResource[CredentialsResource] for the `credentials` argument
bucket = FileStoreBucket(
    credentials=unconfigured_credentials_resource,
    region="us-east-1",
)