Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added arguments to azuremldataasset for better control of output path #75

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## [Unreleased]

- Added `--on-job-scheduled` argument to `kedro azureml run` to plug-in custom behaviour after Azure ML job is scheduled [@Gabriel2409](https://github.com/Gabriel2409)
- Added two new arguments (`datastore` and `azureml_root_dir`, to `AzureMLAssetDataSet` (`datastore` and `azureml_root_dir`) allowing users to specify where to save the data if `AzureMLAssetDataSet` is used as a node output (non local runs) by [@Gabriel2409](https://github.com/Gabriel2409)
- Added support for using `AzureMLAssetDataSet uri_file` as node output by [@Gabriel2409](https://github.com/Gabriel2409)

## [0.6.0] - 2023-09-01

Expand Down
2 changes: 1 addition & 1 deletion kedro_azureml/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def compile(
@click.option(
"--az-output",
"azure_outputs",
type=(str, click.Path(exists=True, file_okay=True, dir_okay=True)),
type=(str, click.Path(exists=False)),
multiple=True,
help="Name and path of Azure ML Pipeline output",
)
Expand Down
30 changes: 22 additions & 8 deletions kedro_azureml/datasets/asset_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ class AzureMLAssetDataSet(AzureMLPipelineDataSet, AbstractVersionedDataSet):
| - ``filepath_arg``: Filepath arg on the wrapped dataset, defaults to `filepath`
| - ``azureml_type``: Either `uri_folder` or `uri_file`
| - ``version``: Version of the AzureML dataset to be used in kedro format.
| - ``datastore``: datastore name, only used to resolve the path when using the
data asset as an output (non local runs).
| - ``azureml_root_dir``: The folder where to save the data asset, only used to
resolve the path when using the data asset as an output (non local runs).
Final output path will be start with
"azureml://datastores/<datastore>/paths/<azureml_root_dir>/<job_id>"

Example
-------
Expand All @@ -52,19 +58,17 @@ class AzureMLAssetDataSet(AzureMLPipelineDataSet, AbstractVersionedDataSet):
.. code-block:: yaml

my_folder_dataset:
type: kedro_azureml.datasets.AzureMLAssetDataSet
azureml_dataset: my_azureml_folder_dataset
root_dir: data/01_raw/some_folder/
versioned: True
dataset:
type: pandas.ParquetDataSet
filepath: "."
type: kedro_azureml.datasets.AzureMLAssetDataSet
azureml_dataset: my_azureml_folder_dataset
root_dir: data/01_raw/some_folder/
dataset:
type: pandas.ParquetDataSet
filepath: "."

my_file_dataset:
type: kedro_azureml.datasets.AzureMLAssetDataSet
azureml_dataset: my_azureml_file_dataset
root_dir: data/01_raw/some_other_folder/
versioned: True
dataset:
type: pandas.ParquetDataSet
filepath: "companies.csv"
Expand All @@ -81,6 +85,8 @@ def __init__(
filepath_arg: str = "filepath",
azureml_type: AzureMLDataAssetType = "uri_folder",
version: Optional[Version] = None,
datastore: str = "${{default_datastore}}",
azureml_root_dir: str = "kedro_azureml", # maybe combine with root_dir?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment can be removed here

):
"""
azureml_dataset: Name of the AzureML file azureml_dataset.
Expand All @@ -90,9 +96,17 @@ def __init__(
filepath_arg: Filepath arg on the wrapped dataset, defaults to `filepath`
azureml_type: Either `uri_folder` or `uri_file`
version: Version of the AzureML dataset to be used in kedro format.
datastore: datastore name, only used to resolve the path when using the
data asset as an output (non local runs). Defaults to pipeline defaut
data store (resolved server side, see
https://learn.microsoft.com/en-us/azure/machine-learning/concept-expressions?view=azureml-api-2)
azureml_root_dir: The folder where to save the data asset, only used to
resolve the path when using the data asset as an output (non local runs).
"""
super().__init__(dataset=dataset, root_dir=root_dir, filepath_arg=filepath_arg)

self._azureml_root_dir = azureml_root_dir
self._datastore = datastore
self._azureml_dataset = azureml_dataset
self._version = version
# 1 entry for load version, 1 for save version
Expand Down
20 changes: 15 additions & 5 deletions kedro_azureml/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,22 @@ def _get_output(self, name):
if name in self.catalog.list() and isinstance(
ds := self.catalog._get_dataset(name), AzureMLAssetDataSet
):
output_path = (
f"azureml://datastores/{ds._datastore}/paths/{ds._azureml_root_dir}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to strip off trailing slashes from ds._azureml_root_dir, or use some URI parsing lib

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an URI parsing lib may be overkill. Moreover datastore can be equal to ${{default_datastore}} and I don't know if that will pose a problem with these libraries.

However, what about ds._azureml_root_dir.strip("/") to remove both leading and trailing slash?

Another solution would be to add checks on the parameters on data set creation and throw an error if they don't match the correct format. Actually if we go this route, this is probably something we want to add to the datastore and root_dir parameter as well.

)

# add the job id to the path (actual value is injected when job is run)
output_path = f"{output_path}/${{{{name}}}}"

if ds._azureml_type == "uri_file":
raise ValueError(
"AzureMLAssetDataSets with azureml_type 'uri_file' cannot be used as outputs"
)
# TODO: add versioning
return Output(type=ds._azureml_type, name=ds._azureml_dataset)
output_path = f"{output_path}/{ds._dataset_config[ds._filepath_arg]}"
# note that this will always create a new version of the dataset, even if we
# have versioned set to false.
return Output(
type=ds._azureml_type,
name=ds._azureml_dataset,
path=output_path,
)
else:
return Output(type="uri_folder")

Expand Down
4 changes: 4 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def multi_catalog():
"filepath": "abc.csv",
},
azureml_dataset="test_dataset",
azureml_type="uri_file",
version=Version(None, None),
)
parq = AzureMLAssetDataSet(
Expand All @@ -264,6 +265,9 @@ def multi_catalog():
"filepath": "xyz.parq",
},
azureml_dataset="test_dataset_2",
azureml_type="uri_folder",
azureml_root_dir="azureml_root",
datastore="mydatastore",
version=Version(None, None),
)
return DataCatalog({"input_data": csv, "i2": parq})
4 changes: 4 additions & 0 deletions tests/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,16 @@ def test_azureml_asset_dataset(
local_run,
download,
):
# ensure that AzureMLAssetDataSet type matches path in the mock_azureml_client
with mock_azureml_client() as mac:
azureml_type = mac.data.get().type
ds = AzureMLAssetDataSet(
dataset={
"type": dataset_type,
"filepath": path_in_aml,
},
azureml_dataset="test_dataset",
azureml_type=azureml_type,
version=Version(None, None),
)
ds._local_run = local_run
Expand Down
17 changes: 17 additions & 0 deletions tests/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ def test_can_generate_azure_pipeline(
for node in az_pipeline.jobs.values()
), "Invalid docker image set on commands"

# check Output of generated command for AzuremlDataAssets

i2_job = az_pipeline.jobs["node1"].outputs["i2"]._to_job_output()
i2_dataset = multi_catalog.datasets.i2
assert i2_dataset._azureml_type == i2_job.type, "Wrong Output type"
assert i2_dataset._azureml_dataset == i2_job.name, "Wrong Output name"
assert i2_dataset._datastore in i2_job.path, "datastore not passed to Output"
assert (
i2_dataset._azureml_root_dir in i2_job.path
), "azureml root dir not passed to Output"

# check Output for non AzuremlDataAssets
i3_job = az_pipeline.jobs["node2"].outputs["i3"]._to_job_output()
assert i3_job.type == "uri_folder", "Wrong Output type"
assert i3_job.path is None, "Output path is not empty"
assert i3_job.name is None, "Output name is not empty"


def test_azure_pipeline_with_different_compute(
dummy_pipeline_compute_tag, dummy_plugin_config, multi_catalog
Expand Down
Loading