Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added arguments to azuremldataasset for better control of output path #75

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
Open
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## [Unreleased]

- Added `--on-job-scheduled` argument to `kedro azureml run` to plug-in custom behaviour after Azure ML job is scheduled [@Gabriel2409](https://github.com/Gabriel2409)
- Added two new arguments (`datastore` and `azureml_root_dir`, to `AzureMLAssetDataSet` (`datastore` and `azureml_root_dir`) allowing users to specify where to save the data if `AzureMLAssetDataSet` is used as a node output (non local runs) by [@Gabriel2409](https://github.com/Gabriel2409)
- Added support for using `AzureMLAssetDataSet uri_file` as node output by [@Gabriel2409](https://github.com/Gabriel2409)

## [0.6.0] - 2023-09-01

Expand Down
2 changes: 1 addition & 1 deletion kedro_azureml/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ def compile(
@click.option(
"--az-output",
"azure_outputs",
type=(str, click.Path(exists=True, file_okay=True, dir_okay=True)),
type=(str, click.Path(exists=False)),
multiple=True,
help="Name and path of Azure ML Pipeline output",
)
Expand Down
80 changes: 72 additions & 8 deletions kedro_azureml/datasets/asset_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from functools import partial
from operator import attrgetter
from pathlib import Path
Expand Down Expand Up @@ -43,6 +44,13 @@ class AzureMLAssetDataSet(AzureMLPipelineDataSet, AbstractVersionedDataSet):
| - ``filepath_arg``: Filepath arg on the wrapped dataset, defaults to `filepath`
| - ``azureml_type``: Either `uri_folder` or `uri_file`
| - ``version``: Version of the AzureML dataset to be used in kedro format.
| - ``datastore``: datastore name, only used to resolve the path when using the
data asset as an output (non local runs).
| - ``azureml_root_dir``: The folder where to save the data asset, only used to
resolve the path when using the data asset as an output (non local runs). Must
be a linux file type without any spaces. Leading and trailing slash are stripped
Final output path will be start with
"azureml://datastores/<datastore>/paths/<azureml_root_dir>/<job_id>"

Example
-------
Expand All @@ -52,19 +60,17 @@ class AzureMLAssetDataSet(AzureMLPipelineDataSet, AbstractVersionedDataSet):
.. code-block:: yaml

my_folder_dataset:
type: kedro_azureml.datasets.AzureMLAssetDataSet
azureml_dataset: my_azureml_folder_dataset
root_dir: data/01_raw/some_folder/
versioned: True
dataset:
type: pandas.ParquetDataSet
filepath: "."
type: kedro_azureml.datasets.AzureMLAssetDataSet
azureml_dataset: my_azureml_folder_dataset
root_dir: data/01_raw/some_folder/
dataset:
type: pandas.ParquetDataSet
filepath: "."

my_file_dataset:
type: kedro_azureml.datasets.AzureMLAssetDataSet
azureml_dataset: my_azureml_file_dataset
root_dir: data/01_raw/some_other_folder/
versioned: True
dataset:
type: pandas.ParquetDataSet
filepath: "companies.csv"
Expand All @@ -81,6 +87,8 @@ def __init__(
filepath_arg: str = "filepath",
azureml_type: AzureMLDataAssetType = "uri_folder",
version: Optional[Version] = None,
datastore: str = "${{default_datastore}}",
azureml_root_dir: str = "kedro_azureml",
):
"""
azureml_dataset: Name of the AzureML file azureml_dataset.
Expand All @@ -90,9 +98,23 @@ def __init__(
filepath_arg: Filepath arg on the wrapped dataset, defaults to `filepath`
azureml_type: Either `uri_folder` or `uri_file`
version: Version of the AzureML dataset to be used in kedro format.
datastore: datastore name, only used to resolve the path when using the
data asset as an output (non local runs). Defaults to pipeline defaut
data store (resolved server side, see
https://learn.microsoft.com/en-us/azure/machine-learning/concept-expressions?view=azureml-api-2)
azureml_root_dir: The folder where to save the data asset, only used to
resolve the path when using the data asset as an output (non local runs). Must
be a linux file type without any spaces. Leading and trailing slash are stripped
"""
super().__init__(dataset=dataset, root_dir=root_dir, filepath_arg=filepath_arg)

self._validate_datastore_name(datastore)
# needed to ensure there is no extra / in the string passed to Output
azureml_root_dir = azureml_root_dir.strip("/")
self._validate_azureml_root_dir(azureml_root_dir)

self._azureml_root_dir = azureml_root_dir
self._datastore = datastore
self._azureml_dataset = azureml_dataset
self._version = version
# 1 entry for load version, 1 for save version
Expand All @@ -115,6 +137,48 @@ def __init__(
f"the dataset definition."
)

def _validate_datastore_name(self, datastore: str):
"""
Validates a datastore name to check it contains exclusively lowercase letters,
digits and underscores or if it matches ${{default_datastore}}

:param datastore: The name of the datastore to be validated.
:type datastore: str

:raises ValueError: If the provided `datastore` name is incorrect.

:rtype: None

Learn more about the special expression "${{default_datastore}}" at:
https://learn.microsoft.com/azure/machine-learning/concept-expressions
"""
if not re.match(r"(^[a-z0-9_]+$|^\${{default_datastore}}$)", datastore):
raise ValueError(
"The datastore name must exclusively contain "
"lowercase letters, numbers, and underscores. "
"You can also use ${[default_datastore]}: "
"https://learn.microsoft.com/azure/machine-learning/concept-expressions"
)

def _validate_azureml_root_dir(self, azureml_root_dir: str):
"""
Validates the azureml root_dir to check it is a valid unix file path

:param azureml_root_dir: The name to be validated.
:type datastore: str

:raises ValueError: If the provided `azureml_root_dir` name is incorrect.

:rtype: None
"""
if not re.match(r"^\/?([\w\-_]\/?)+$", azureml_root_dir):
raise ValueError(
"azureml_root_dir must only contain "
"lowercase letters, numbers, and underscores. "
"Folders should be separated by a '/'. "
"Spaces are not allowed."
)

@property
def azure_config(self) -> AzureMLConfig:
"""AzureML config to be used by the dataset."""
Expand Down
20 changes: 15 additions & 5 deletions kedro_azureml/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,22 @@ def _get_output(self, name):
if name in self.catalog.list() and isinstance(
ds := self.catalog._get_dataset(name), AzureMLAssetDataSet
):
output_path = (
f"azureml://datastores/{ds._datastore}/paths/{ds._azureml_root_dir}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to strip off trailing slashes from ds._azureml_root_dir, or use some URI parsing lib

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an URI parsing lib may be overkill. Moreover datastore can be equal to ${{default_datastore}} and I don't know if that will pose a problem with these libraries.

However, what about ds._azureml_root_dir.strip("/") to remove both leading and trailing slash?

Another solution would be to add checks on the parameters on data set creation and throw an error if they don't match the correct format. Actually if we go this route, this is probably something we want to add to the datastore and root_dir parameter as well.

)

# add the job id to the path (actual value is injected when job is run)
output_path = f"{output_path}/${{{{name}}}}"

if ds._azureml_type == "uri_file":
raise ValueError(
"AzureMLAssetDataSets with azureml_type 'uri_file' cannot be used as outputs"
)
# TODO: add versioning
return Output(type=ds._azureml_type, name=ds._azureml_dataset)
output_path = f"{output_path}/{ds._dataset_config[ds._filepath_arg]}"
# note that this will always create a new version of the dataset, even if we
# have versioned set to false.
return Output(
type=ds._azureml_type,
name=ds._azureml_dataset,
path=output_path,
)
else:
return Output(type="uri_folder")

Expand Down
4 changes: 4 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def multi_catalog():
"filepath": "abc.csv",
},
azureml_dataset="test_dataset",
azureml_type="uri_file",
version=Version(None, None),
)
parq = AzureMLAssetDataSet(
Expand All @@ -264,6 +265,9 @@ def multi_catalog():
"filepath": "xyz.parq",
},
azureml_dataset="test_dataset_2",
azureml_type="uri_folder",
azureml_root_dir="azureml_root",
datastore="mydatastore",
version=Version(None, None),
)
return DataCatalog({"input_data": csv, "i2": parq})
67 changes: 67 additions & 0 deletions tests/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,16 @@ def test_azureml_asset_dataset(
local_run,
download,
):
# ensure that AzureMLAssetDataSet type matches path in the mock_azureml_client
with mock_azureml_client() as mac:
azureml_type = mac.data.get().type
ds = AzureMLAssetDataSet(
dataset={
"type": dataset_type,
"filepath": path_in_aml,
},
azureml_dataset="test_dataset",
azureml_type=azureml_type,
version=Version(None, None),
)
ds._local_run = local_run
Expand All @@ -359,6 +363,29 @@ def test_azureml_asset_dataset(
assert (ds._load()["data"] == df["data"]).all()


@pytest.mark.parametrize(
"azureml_root_dir",
[
"azureml/root",
"/azureml/root",
"azureml/root/",
"/azureml/root/",
],
ids=("noslash", "leading", "trailing", "both"),
)
def test_azureml_root_dir_has_no_leading_and_trailing_slashes(azureml_root_dir):
aml_dataset = AzureMLAssetDataSet(
dataset={
"type": PickleDataSet,
"filepath": "some/random/path/test.pickle",
},
azureml_dataset="test_dataset",
version=Version(None, None),
azureml_root_dir=azureml_root_dir,
)
assert aml_dataset._azureml_root_dir == azureml_root_dir.strip("/")


def test_azureml_assetdataset_raises_DataSetError_azureml_type():
with pytest.raises(DataSetError, match="mltable"):
AzureMLAssetDataSet(
Expand All @@ -385,6 +412,46 @@ def test_azureml_assetdataset_raises_DataSetError_wrapped_dataset_versioned():
)


@pytest.mark.parametrize(
"datastore",
[
"my-datastore",
"my/datastore",
"MyDatastore",
],
ids=("hyphen", "slash", "uppercase"),
)
def test_azureml_assetdataset_raises_ValueError_bad_datastore_name(datastore):
with pytest.raises(ValueError, match=r".*datastore.*"):
AzureMLAssetDataSet(
dataset={
"type": PickleDataSet,
"filepath": "some/random/path/test.pickle",
},
azureml_dataset="test_dataset",
version=Version(None, None),
datastore=datastore,
)


@pytest.mark.parametrize(
"azureml_root_dir",
["my dir", "my//dir", "my\\dir", "my$dir"],
ids=("space", "doubleslash", "backslash", "special_char"),
)
def test_azureml_assetdataset_raises_ValueError_bad_azureml_root_dir(azureml_root_dir):
with pytest.raises(ValueError, match=r".*azureml_root_dir.*"):
AzureMLAssetDataSet(
dataset={
"type": PickleDataSet,
"filepath": "some/random/path/test.pickle",
},
azureml_dataset="test_dataset",
version=Version(None, None),
azureml_root_dir=azureml_root_dir,
)


def test_azureml_pipeline_dataset(tmp_path: Path):
ds = AzureMLPipelineDataSet(
{
Expand Down
17 changes: 17 additions & 0 deletions tests/test_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ def test_can_generate_azure_pipeline(
for node in az_pipeline.jobs.values()
), "Invalid docker image set on commands"

# check Output of generated command for AzuremlDataAssets

i2_job = az_pipeline.jobs["node1"].outputs["i2"]._to_job_output()
i2_dataset = multi_catalog.datasets.i2
assert i2_dataset._azureml_type == i2_job.type, "Wrong Output type"
assert i2_dataset._azureml_dataset == i2_job.name, "Wrong Output name"
assert i2_dataset._datastore in i2_job.path, "datastore not passed to Output"
assert (
i2_dataset._azureml_root_dir in i2_job.path
), "azureml root dir not passed to Output"

# check Output for non AzuremlDataAssets
i3_job = az_pipeline.jobs["node2"].outputs["i3"]._to_job_output()
assert i3_job.type == "uri_folder", "Wrong Output type"
assert i3_job.path is None, "Output path is not empty"
assert i3_job.name is None, "Output name is not empty"


def test_azure_pipeline_with_different_compute(
dummy_pipeline_compute_tag, dummy_plugin_config, multi_catalog
Expand Down