from dagstermill import define_dagstermill_asset
from dagster import file_relative_path
iris_kmeans_notebook = define_dagstermill_asset(
name="iris_kmeans",
notebook_path=file_relative_path(__file__,"../notebooks/iris-kmeans.ipynb"),)
In this code block, we use define_dagstermill_asset to create a Dagster asset. We provide the name for the asset with the name parameter and the path to our .ipynb file with the notebook_path parameter. The resulting asset will execute our notebook and store the resulting .ipynb file in a persistent location.
Dagstermill also supports running Jupyter notebooks as ops. We can use define_dagstermill_op to turn a notebook into an op:
from dagstermill import define_dagstermill_op, local_output_notebook_io_manager
from dagster import file_relative_path, job
k_means_iris = define_dagstermill_op(
name="k_means_iris",
notebook_path=file_relative_path(__file__,"./notebooks/iris-kmeans.ipynb"),
output_notebook_name="iris_kmeans_output",)@job(
resource_defs={"output_notebook_io_manager": local_output_notebook_io_manager,})defiris_classify():
k_means_iris()
In this code block, we use define_dagstermill_op to create an op that will execute the Jupyter notebook. We give the op the name k_means_iris, and provide the path to the notebook file. We also specify output_notebook_name=iris_kmeans_output. This means that the executed notebook will be returned in a buffered file object as one of the outputs of the op, and that output will have the name iris_kmeans_output. We then include the k_means_iris op in the iris_classify job and specify the local_output_notebook_io_manager as the output_notebook_io_manager to store the executed notebook file.
If you look at one of the notebooks executed by Dagster, you'll notice that the injected-parameters cell in your output notebooks defines a variable called context. This context object mirrors the execution context object that's available in the body of any other asset or op's compute function.
As with the parameters that dagstermill injects, you can also construct a context object for interactive exploration and development by using the dagstermill.get_context API in the tagged parameters cell of your input notebook. When Dagster executes your notebook, this development context will be replaced with the injected runtime context.
You can use the development context to access asset and op config and resources, to log messages, and to yield results and other Dagster events just as you would in production. When the runtime context is injected by Dagster, none of your other code needs to change.
For instance, suppose we want to make the number of clusters (the k in k-means) configurable. We'll change our asset definition to include a config field:
from dagstermill import define_dagstermill_asset
from dagster import AssetIn, Field, Int, file_relative_path
iris_kmeans_jupyter_notebook = define_dagstermill_asset(
name="iris_kmeans_jupyter",
notebook_path=file_relative_path(__file__,"./notebooks/iris-kmeans.ipynb"),
group_name="template_tutorial",
ins={"iris": AssetIn("iris_dataset")},
config_schema=Field(
Int,
default_value=3,
is_required=False,
description="The number of clusters to find",),)
You can also provide config_schema to define_dagstermill_op in the same way demonstrated in this code snippet.
In our notebook, we'll stub the context as follows (in the parameters cell):
The functionality described in this section only works for notebooks run withdefine_dagstermill_op. If you'd like adding this feature todefine_dagstermill_asset to be prioritized, give this GitHub Issue a thumbs up.
If you are using define_dagstermill_op and you'd like to yield a result to be consumed downstream of a notebook, you can call yield_result with the value of the result and its name. In interactive execution, this is a no-op, so you don't need to change anything when moving from interactive exploration and development to production.
You can also yield Dagster events from your notebook using yield_event.
For example, if you'd like to yield a custom AssetMaterialization object (for instance, to tell Dagit where you've saved a plot), you can do the following:
import dagstermill
from dagster import AssetMaterialization
dagstermill.yield_event(AssetMaterialization(asset_key="marketing_data_plotted"))