From 7df60e8adca9d30994c2fb711c03f3e556c2986b Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Wed, 16 Oct 2024 11:54:52 -0400 Subject: [PATCH 01/12] add explicit credentials to AzureMLAssetDataset --- kedro_azureml/datasets/asset_dataset.py | 3 ++- kedro_azureml/hooks.py | 14 +++++++------- tests/conf/e2e/catalog.yml | 1 + tests/conf/e2e_pipeline_data_passing/catalog.yml | 1 + 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/kedro_azureml/datasets/asset_dataset.py b/kedro_azureml/datasets/asset_dataset.py index 73e2874..994796f 100644 --- a/kedro_azureml/datasets/asset_dataset.py +++ b/kedro_azureml/datasets/asset_dataset.py @@ -75,6 +75,7 @@ class AzureMLAssetDataset(AzureMLPipelineDataset, AbstractVersionedDataset): def __init__( self, + credentials, azureml_dataset: str, dataset: Union[str, Type[AbstractDataset], Dict[str, Any]], root_dir: str = "data", @@ -107,7 +108,7 @@ def __init__( self._version_cache = Cache(maxsize=2) # type: Cache self._download = True self._local_run = True - self._azureml_config = None + self._azureml_config = AzureMLConfig(**credentials) self._azureml_type = azureml_type if self._azureml_type not in get_args(AzureMLDataAssetType): raise DatasetError( diff --git a/kedro_azureml/hooks.py b/kedro_azureml/hooks.py index aa26c99..c6d6c06 100644 --- a/kedro_azureml/hooks.py +++ b/kedro_azureml/hooks.py @@ -14,14 +14,14 @@ def after_context_created(self, context) -> None: context.config_loader.config_patterns.update( {"azureml": ["azureml*", "azureml*/**", "**/azureml*"]} ) - self.azure_config = AzureMLConfig(**context.config_loader["azureml"]["azure"]) - @hook_impl - def after_catalog_created(self, catalog): - for dataset_name, dataset in catalog._data_sets.items(): - if isinstance(dataset, AzureMLAssetDataset): - dataset.azure_config = self.azure_config - catalog.add(dataset_name, dataset, replace=True) + azure_creds = {"azureml": AzureMLConfig(**context.config_loader["azureml"]["azure"]).__dict__} + + context.config_loader["credentials"] = { + **context.config_loader["credentials"], + **azure_creds, + } + @hook_impl def before_pipeline_run(self, run_params, pipeline, catalog): diff --git a/tests/conf/e2e/catalog.yml b/tests/conf/e2e/catalog.yml index 88c9594..dd4af2f 100644 --- a/tests/conf/e2e/catalog.yml +++ b/tests/conf/e2e/catalog.yml @@ -17,6 +17,7 @@ model_input_table: type: kedro_azureml.datasets.AzureMLAssetDataset azureml_dataset: e2e_tests_no_pdp root_dir: data/02_intermediate + credentials: azureml dataset: type: pandas.CSVDataset filepath: model_input_table.csv diff --git a/tests/conf/e2e_pipeline_data_passing/catalog.yml b/tests/conf/e2e_pipeline_data_passing/catalog.yml index 2e3d3f8..eb27e45 100644 --- a/tests/conf/e2e_pipeline_data_passing/catalog.yml +++ b/tests/conf/e2e_pipeline_data_passing/catalog.yml @@ -23,6 +23,7 @@ model_input_table: type: kedro_azureml.datasets.AzureMLAssetDataset azureml_dataset: e2e_tests_pdp root_dir: data/02_intermediate + credentials: azureml dataset: type: pandas.CSVDataset filepath: model_input_table.csv From 84774afd688b82a44c85cf9dfba532a1163ea35d Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Wed, 16 Oct 2024 12:27:52 -0400 Subject: [PATCH 02/12] keep it compatible with previous, explicit after_catalog_created credentials --- kedro_azureml/datasets/asset_dataset.py | 4 ++-- kedro_azureml/hooks.py | 10 +++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/kedro_azureml/datasets/asset_dataset.py b/kedro_azureml/datasets/asset_dataset.py index 994796f..d7431b3 100644 --- a/kedro_azureml/datasets/asset_dataset.py +++ b/kedro_azureml/datasets/asset_dataset.py @@ -75,9 +75,9 @@ class AzureMLAssetDataset(AzureMLPipelineDataset, AbstractVersionedDataset): def __init__( self, - credentials, azureml_dataset: str, dataset: Union[str, Type[AbstractDataset], Dict[str, Any]], + credentials = None, root_dir: str = "data", filepath_arg: str = "filepath", azureml_type: AzureMLDataAssetType = "uri_folder", @@ -108,7 +108,7 @@ def __init__( self._version_cache = Cache(maxsize=2) # type: Cache self._download = True self._local_run = True - self._azureml_config = AzureMLConfig(**credentials) + self._azureml_config = AzureMLConfig(**credentials) if credentials else None self._azureml_type = azureml_type if self._azureml_type not in get_args(AzureMLDataAssetType): raise DatasetError( diff --git a/kedro_azureml/hooks.py b/kedro_azureml/hooks.py index c6d6c06..2cc109c 100644 --- a/kedro_azureml/hooks.py +++ b/kedro_azureml/hooks.py @@ -15,13 +15,21 @@ def after_context_created(self, context) -> None: {"azureml": ["azureml*", "azureml*/**", "**/azureml*"]} ) - azure_creds = {"azureml": AzureMLConfig(**context.config_loader["azureml"]["azure"]).__dict__} + self.azure_config = AzureMLConfig(**context.config_loader["azureml"]["azure"]) + + azure_creds = {"azureml": self.azure_config.__dict__} context.config_loader["credentials"] = { **context.config_loader["credentials"], **azure_creds, } + @hook_impl + def after_catalog_created(self, catalog): + for dataset_name, dataset in catalog._data_sets.items(): + if isinstance(dataset, AzureMLAssetDataset): + dataset.azure_config = self.azure_config + catalog.add(dataset_name, dataset, replace=True) @hook_impl def before_pipeline_run(self, run_params, pipeline, catalog): From 922848b1ebaf270f8901302fc5ee18a0dbdd79fc Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Wed, 16 Oct 2024 16:06:30 -0400 Subject: [PATCH 03/12] remove checks for catalog list, and use catalog.__contains__() instead catalog already has a __contains__ method, which ensures that the dataset name (or input/output name in some cases) fits either with a known dataset, or a dataset pattern, which enables it to work with dataset factory --- kedro_azureml/generator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kedro_azureml/generator.py b/kedro_azureml/generator.py index 9db7954..17e1e5c 100644 --- a/kedro_azureml/generator.py +++ b/kedro_azureml/generator.py @@ -157,7 +157,7 @@ def _get_versioned_azureml_dataset_name( def _get_input(self, dataset_name: str, pipeline: Pipeline) -> Input: if self._is_param_or_root_non_azureml_asset_dataset(dataset_name, pipeline): return Input(type="string") - elif dataset_name in self.catalog.list() and isinstance( + elif dataset_name in self.catalog and isinstance( ds := self.catalog._get_dataset(dataset_name), AzureMLAssetDataset ): if ds._azureml_type == "uri_file" and dataset_name not in pipeline.inputs(): @@ -169,7 +169,7 @@ def _get_input(self, dataset_name: str, pipeline: Pipeline) -> Input: return Input(type="uri_folder") def _get_output(self, name): - if name in self.catalog.list() and isinstance( + if name in self.catalog and isinstance( ds := self.catalog._get_dataset(name), AzureMLAssetDataset ): if ds._azureml_type == "uri_file": @@ -208,7 +208,7 @@ def _is_param_or_root_non_azureml_asset_dataset( ) -> bool: return dataset_name.startswith(PARAMS_PREFIX) or ( dataset_name in pipeline.inputs() - and dataset_name in self.catalog.list() + and dataset_name in self.catalog and not isinstance( self.catalog._get_dataset(dataset_name), AzureMLAssetDataset ) @@ -334,7 +334,7 @@ def _connect_commands(self, pipeline: Pipeline, commands: Dict[str, Command]): azure_output = parent_outputs[sanitized_input_name] azure_inputs[sanitized_input_name] = azure_output # 2. try to find AzureMLAssetDataset in catalog - elif node_input in self.catalog.list() and isinstance( + elif node_input in self.catalog and isinstance( ds := self.catalog._get_dataset(node_input), AzureMLAssetDataset ): azure_inputs[sanitized_input_name] = Input( From 6e0fbaab841b6ec8e0dc0162f80c764db79af44e Mon Sep 17 00:00:00 2001 From: Alexandre Ouellet Date: Thu, 17 Oct 2024 16:20:48 +0000 Subject: [PATCH 04/12] ensure AzurePipelineRunner handles dataset_factory --- kedro_azureml/runner.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kedro_azureml/runner.py b/kedro_azureml/runner.py index 66fca90..3666e59 100644 --- a/kedro_azureml/runner.py +++ b/kedro_azureml/runner.py @@ -47,11 +47,10 @@ def run( session_id: str = None, ) -> Dict[str, Any]: catalog = catalog.shallow_copy() - catalog_set = set(catalog.list()) # Loop over datasets in arguments to set their paths for ds_name, azure_dataset_path in self.data_paths.items(): - if ds_name in catalog_set: + if ds_name in catalog: ds = catalog._get_dataset(ds_name) if isinstance(ds, AzureMLPipelineDataset): if ( @@ -66,7 +65,7 @@ def run( catalog.add(ds_name, self.create_default_data_set(ds_name)) # Loop over remaining input datasets to add them to the catalog - unsatisfied = pipeline.inputs() - set(catalog.list()) + unsatisfied = [input for input in pipeline.inputs() if input not in catalog] for ds_name in unsatisfied: catalog.add(ds_name, self.create_default_data_set(ds_name)) From 647b3a9732ae80bc94f1b01fc52168e9e42ad1db Mon Sep 17 00:00:00 2001 From: Alexandre Ouellet Date: Thu, 17 Oct 2024 18:12:26 +0000 Subject: [PATCH 05/12] add error log to help debug issue with AzureMLAssetDataset path not being updated --- kedro_azureml/runner.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/kedro_azureml/runner.py b/kedro_azureml/runner.py index 3666e59..4cb6801 100644 --- a/kedro_azureml/runner.py +++ b/kedro_azureml/runner.py @@ -47,27 +47,39 @@ def run( session_id: str = None, ) -> Dict[str, Any]: catalog = catalog.shallow_copy() + logger.error(f"0 catalog: {catalog}") # Loop over datasets in arguments to set their paths + logger.error(f"self.data_paths.items(): {self.data_paths.items()}") for ds_name, azure_dataset_path in self.data_paths.items(): if ds_name in catalog: ds = catalog._get_dataset(ds_name) + logger.error(f"0 ds: {ds}") if isinstance(ds, AzureMLPipelineDataset): if ( isinstance(ds, AzureMLAssetDataset) and ds._azureml_type == "uri_file" ): ds.root_dir = str(Path(azure_dataset_path).parent) + logger.error(f"1 ds: {ds}") else: ds.root_dir = azure_dataset_path + logger.error(f"2 ds: {ds}") catalog.add(ds_name, ds, replace=True) + logger.error(f"1 catalog: {catalog}") else: catalog.add(ds_name, self.create_default_data_set(ds_name)) + logger.error(f"2 catalog: {catalog}") # Loop over remaining input datasets to add them to the catalog unsatisfied = [input for input in pipeline.inputs() if input not in catalog] + logger.error(f"3 catalog: {unsatisfied}") for ds_name in unsatisfied: - catalog.add(ds_name, self.create_default_data_set(ds_name)) + default_ds = self.create_default_data_set(ds_name) + logger.error(f"0 default_ds: {default_ds}") + catalog.add(ds_name, default_ds) + + logger.error(f"4 catalog: {unsatisfied}") return super().run(pipeline, catalog, hook_manager, session_id) From 0c4db8fb3d68ba6a480dded007839b3c46394621 Mon Sep 17 00:00:00 2001 From: Alexandre Ouellet Date: Thu, 17 Oct 2024 19:53:10 +0000 Subject: [PATCH 06/12] add more error log for debugging asset_dataset.py --- kedro_azureml/datasets/asset_dataset.py | 87 +++++++++++++++---------- 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/kedro_azureml/datasets/asset_dataset.py b/kedro_azureml/datasets/asset_dataset.py index d7431b3..df41e03 100644 --- a/kedro_azureml/datasets/asset_dataset.py +++ b/kedro_azureml/datasets/asset_dataset.py @@ -159,8 +159,12 @@ def download_path(self) -> str: def _construct_dataset(self) -> AbstractDataset: dataset_config = self._dataset_config.copy() + logger.error(f"8: dataset_config {dataset_config}") + dataset_config[self._filepath_arg] = str(self.path) - return self._dataset_type(**dataset_config) + x = self._dataset_type(**dataset_config) + logger.error(f"9: x {x}") + return x def _get_latest_version(self) -> str: try: @@ -184,40 +188,53 @@ def _get_azureml_dataset(self): ) def _load(self) -> Any: - if self._download: - try: - azureml_ds = self._get_azureml_dataset() - except ResourceNotFoundError: - raise VersionNotFoundError( - f"Did not find version {self.resolve_load_version()} for {self}" - ) - fs = AzureMachineLearningFileSystem(azureml_ds.path) - if azureml_ds.type == "uri_file": - # relative (to storage account root) path of the file dataset on azure - # Note that path is converted to str for compatibility reasons with - # fsspec AbstractFileSystem expand_path function - path_on_azure = str(fs._infer_storage_options(azureml_ds.path)[1]) - elif azureml_ds.type == "uri_folder": - # relative (to storage account root) path of the folder dataset on azure - dataset_root_on_azure = fs._infer_storage_options(azureml_ds.path)[1] - # relative (to storage account root) path of the dataset in the folder on azure - path_on_azure = str( - Path(dataset_root_on_azure) - / self._dataset_config[self._filepath_arg] - ) - else: - raise ValueError("Unsupported AzureMLDataset type") - if fs.isfile(path_on_azure): - # using APPEND will keep the local file if it already exists - # as versions are unique this will prevent unnecessary file download - fs.download(path_on_azure, self.download_path, overwrite="APPEND") - else: - # we take the relative within the Azure dataset to avoid downloading - # all files in a folder dataset. - for fpath in fs.ls(path_on_azure): - logger.info(f"Downloading {fpath} for local execution") - fs.download(fpath, self.download_path, overwrite="APPEND") - return self._construct_dataset().load() + try: + logger.error(f"0: self.path: {self.path}") + if self._download: + try: + azureml_ds = self._get_azureml_dataset() + logger.error(f"1: azureml_ds: {azureml_ds}") + except ResourceNotFoundError: + raise VersionNotFoundError( + f"Did not find version {self.resolve_load_version()} for {self}" + ) + fs = AzureMachineLearningFileSystem(azureml_ds.path) + logger.error(f"2: fs: {fs}") + if azureml_ds.type == "uri_file": + # relative (to storage account root) path of the file dataset on azure + # Note that path is converted to str for compatibility reasons with + # fsspec AbstractFileSystem expand_path function + path_on_azure = str(fs._infer_storage_options(azureml_ds.path)[1]) + logger.error(f"3: path_on_azure: {path_on_azure}") + elif azureml_ds.type == "uri_folder": + # relative (to storage account root) path of the folder dataset on azure + dataset_root_on_azure = fs._infer_storage_options(azureml_ds.path)[1] + # relative (to storage account root) path of the dataset in the folder on azure + path_on_azure = str( + Path(dataset_root_on_azure) + / self._dataset_config[self._filepath_arg] + ) + logger.error(f"4: path_on_azure: {path_on_azure}") + + else: + raise ValueError("Unsupported AzureMLDataset type") + if fs.isfile(path_on_azure): + # using APPEND will keep the local file if it already exists + # as versions are unique this will prevent unnecessary file download + fs.download(path_on_azure, self.download_path, overwrite="APPEND") + logger.error(f"5: path_on_azure: {self.download_path}") + logger.error(f"6: path_on_azure: {self.download_path}") + else: + # we take the relative within the Azure dataset to avoid downloading + # all files in a folder dataset. + for fpath in fs.ls(path_on_azure): + logger.error(f"Downloading {fpath} for local execution") + fs.download(fpath, self.download_path, overwrite="APPEND") + return self._construct_dataset().load() + except Exception as e: + logger.error(f"10: {e}") + logger.error(e) + raise def _save(self, data: Any) -> None: self._construct_dataset().save(data) From 397b83f3f629178fc19a14fe5c47deb2c3b03b42 Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Mon, 21 Oct 2024 10:31:06 -0400 Subject: [PATCH 07/12] revert back generator implementation --- kedro_azureml/generator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kedro_azureml/generator.py b/kedro_azureml/generator.py index 17e1e5c..9db7954 100644 --- a/kedro_azureml/generator.py +++ b/kedro_azureml/generator.py @@ -157,7 +157,7 @@ def _get_versioned_azureml_dataset_name( def _get_input(self, dataset_name: str, pipeline: Pipeline) -> Input: if self._is_param_or_root_non_azureml_asset_dataset(dataset_name, pipeline): return Input(type="string") - elif dataset_name in self.catalog and isinstance( + elif dataset_name in self.catalog.list() and isinstance( ds := self.catalog._get_dataset(dataset_name), AzureMLAssetDataset ): if ds._azureml_type == "uri_file" and dataset_name not in pipeline.inputs(): @@ -169,7 +169,7 @@ def _get_input(self, dataset_name: str, pipeline: Pipeline) -> Input: return Input(type="uri_folder") def _get_output(self, name): - if name in self.catalog and isinstance( + if name in self.catalog.list() and isinstance( ds := self.catalog._get_dataset(name), AzureMLAssetDataset ): if ds._azureml_type == "uri_file": @@ -208,7 +208,7 @@ def _is_param_or_root_non_azureml_asset_dataset( ) -> bool: return dataset_name.startswith(PARAMS_PREFIX) or ( dataset_name in pipeline.inputs() - and dataset_name in self.catalog + and dataset_name in self.catalog.list() and not isinstance( self.catalog._get_dataset(dataset_name), AzureMLAssetDataset ) @@ -334,7 +334,7 @@ def _connect_commands(self, pipeline: Pipeline, commands: Dict[str, Command]): azure_output = parent_outputs[sanitized_input_name] azure_inputs[sanitized_input_name] = azure_output # 2. try to find AzureMLAssetDataset in catalog - elif node_input in self.catalog and isinstance( + elif node_input in self.catalog.list() and isinstance( ds := self.catalog._get_dataset(node_input), AzureMLAssetDataset ): azure_inputs[sanitized_input_name] = Input( From bb334ee260600ea78122a1a05f2cb7bd78e2407c Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Mon, 21 Oct 2024 10:31:52 -0400 Subject: [PATCH 08/12] revert back log error for debuging --- kedro_azureml/datasets/asset_dataset.py | 87 ++++++++++--------------- 1 file changed, 35 insertions(+), 52 deletions(-) diff --git a/kedro_azureml/datasets/asset_dataset.py b/kedro_azureml/datasets/asset_dataset.py index df41e03..d7431b3 100644 --- a/kedro_azureml/datasets/asset_dataset.py +++ b/kedro_azureml/datasets/asset_dataset.py @@ -159,12 +159,8 @@ def download_path(self) -> str: def _construct_dataset(self) -> AbstractDataset: dataset_config = self._dataset_config.copy() - logger.error(f"8: dataset_config {dataset_config}") - dataset_config[self._filepath_arg] = str(self.path) - x = self._dataset_type(**dataset_config) - logger.error(f"9: x {x}") - return x + return self._dataset_type(**dataset_config) def _get_latest_version(self) -> str: try: @@ -188,53 +184,40 @@ def _get_azureml_dataset(self): ) def _load(self) -> Any: - try: - logger.error(f"0: self.path: {self.path}") - if self._download: - try: - azureml_ds = self._get_azureml_dataset() - logger.error(f"1: azureml_ds: {azureml_ds}") - except ResourceNotFoundError: - raise VersionNotFoundError( - f"Did not find version {self.resolve_load_version()} for {self}" - ) - fs = AzureMachineLearningFileSystem(azureml_ds.path) - logger.error(f"2: fs: {fs}") - if azureml_ds.type == "uri_file": - # relative (to storage account root) path of the file dataset on azure - # Note that path is converted to str for compatibility reasons with - # fsspec AbstractFileSystem expand_path function - path_on_azure = str(fs._infer_storage_options(azureml_ds.path)[1]) - logger.error(f"3: path_on_azure: {path_on_azure}") - elif azureml_ds.type == "uri_folder": - # relative (to storage account root) path of the folder dataset on azure - dataset_root_on_azure = fs._infer_storage_options(azureml_ds.path)[1] - # relative (to storage account root) path of the dataset in the folder on azure - path_on_azure = str( - Path(dataset_root_on_azure) - / self._dataset_config[self._filepath_arg] - ) - logger.error(f"4: path_on_azure: {path_on_azure}") - - else: - raise ValueError("Unsupported AzureMLDataset type") - if fs.isfile(path_on_azure): - # using APPEND will keep the local file if it already exists - # as versions are unique this will prevent unnecessary file download - fs.download(path_on_azure, self.download_path, overwrite="APPEND") - logger.error(f"5: path_on_azure: {self.download_path}") - logger.error(f"6: path_on_azure: {self.download_path}") - else: - # we take the relative within the Azure dataset to avoid downloading - # all files in a folder dataset. - for fpath in fs.ls(path_on_azure): - logger.error(f"Downloading {fpath} for local execution") - fs.download(fpath, self.download_path, overwrite="APPEND") - return self._construct_dataset().load() - except Exception as e: - logger.error(f"10: {e}") - logger.error(e) - raise + if self._download: + try: + azureml_ds = self._get_azureml_dataset() + except ResourceNotFoundError: + raise VersionNotFoundError( + f"Did not find version {self.resolve_load_version()} for {self}" + ) + fs = AzureMachineLearningFileSystem(azureml_ds.path) + if azureml_ds.type == "uri_file": + # relative (to storage account root) path of the file dataset on azure + # Note that path is converted to str for compatibility reasons with + # fsspec AbstractFileSystem expand_path function + path_on_azure = str(fs._infer_storage_options(azureml_ds.path)[1]) + elif azureml_ds.type == "uri_folder": + # relative (to storage account root) path of the folder dataset on azure + dataset_root_on_azure = fs._infer_storage_options(azureml_ds.path)[1] + # relative (to storage account root) path of the dataset in the folder on azure + path_on_azure = str( + Path(dataset_root_on_azure) + / self._dataset_config[self._filepath_arg] + ) + else: + raise ValueError("Unsupported AzureMLDataset type") + if fs.isfile(path_on_azure): + # using APPEND will keep the local file if it already exists + # as versions are unique this will prevent unnecessary file download + fs.download(path_on_azure, self.download_path, overwrite="APPEND") + else: + # we take the relative within the Azure dataset to avoid downloading + # all files in a folder dataset. + for fpath in fs.ls(path_on_azure): + logger.info(f"Downloading {fpath} for local execution") + fs.download(fpath, self.download_path, overwrite="APPEND") + return self._construct_dataset().load() def _save(self, data: Any) -> None: self._construct_dataset().save(data) From 91f25a3ef0f06564bdf6bd52d9436fa6256a0aa2 Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Mon, 21 Oct 2024 10:34:21 -0400 Subject: [PATCH 09/12] add handling of all inputs, and not only those registered in data catalog (for dataset factories) --- asf | 14 ++++++++++++++ kedro_azureml/hooks.py | 39 +++++++++++++++++---------------------- 2 files changed, 31 insertions(+), 22 deletions(-) create mode 100644 asf diff --git a/asf b/asf new file mode 100644 index 0000000..e67105f --- /dev/null +++ b/asf @@ -0,0 +1,14 @@ +AzureMLDataset(dataset_config={'filepa + th': shuttles.xlsx, 'load_args': + {'engine': openpyxl}}, + dataset_type=ExcelDataset, + filepath_arg=filepath, + root_dir=/mnt/azureml/cr/j/5f0796d2547 + 44ea387fb7b4e5e27d202/cap/data-capabil + ity/wd/INPUT_shuttles) + + +PickleDataset(backend=cloudpickle, +filepath=/mnt/azureml/cr/j/19c6488f0e4644ce9a65e8a6932c8984/cap/data-capability/ +wd/INPUT_companies_csv/companies_csv.pickle, load_args={}, protocol=file, +save_args={}) \ No newline at end of file diff --git a/kedro_azureml/hooks.py b/kedro_azureml/hooks.py index 2cc109c..4b0075c 100644 --- a/kedro_azureml/hooks.py +++ b/kedro_azureml/hooks.py @@ -24,12 +24,6 @@ def after_context_created(self, context) -> None: **azure_creds, } - @hook_impl - def after_catalog_created(self, catalog): - for dataset_name, dataset in catalog._data_sets.items(): - if isinstance(dataset, AzureMLAssetDataset): - dataset.azure_config = self.azure_config - catalog.add(dataset_name, dataset, replace=True) @hook_impl def before_pipeline_run(self, run_params, pipeline, catalog): @@ -39,21 +33,22 @@ def before_pipeline_run(self, run_params, pipeline, catalog): pipeline: The ``Pipeline`` object representing the pipeline to be run. catalog: The ``DataCatalog`` from which to fetch data. """ - for dataset_name, dataset in catalog._data_sets.items(): - if isinstance(dataset, AzureMLAssetDataset): - if AzurePipelinesRunner.__name__ not in run_params["runner"]: - # when running locally using an AzureMLAssetDataset - # as an intermediate dataset we don't want download - # but still set to run local with a local version. - if dataset_name not in pipeline.inputs(): - dataset.as_local_intermediate() - # when running remotely we still want to provide information - # from the azureml config for getting the dataset version during - # remote runs - else: - dataset.as_remote() - - catalog.add(dataset_name, dataset, replace=True) - + for input in pipeline.all_inputs(): + if input in catalog: + dataset = catalog._get_dataset(input) + if isinstance(dataset, AzureMLAssetDataset): + if AzurePipelinesRunner.__name__ not in run_params["runner"]: + # when running locally using an AzureMLAssetDataset + # as an intermediate dataset we don't want download + # but still set to run local with a local version. + if input not in pipeline.inputs(): + dataset.as_local_intermediate() + # when running remotely we still want to provide information + # from the azureml config for getting the dataset version during + # remote runs + else: + dataset.as_remote() + + catalog.add(input, dataset, replace=True) azureml_local_run_hook = AzureMLLocalRunHook() From 1ad46a1448f07008dc6ff10575ac4fbbce1f44df Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Mon, 21 Oct 2024 10:35:22 -0400 Subject: [PATCH 10/12] revert back changes from runner.py --- kedro_azureml/runner.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/kedro_azureml/runner.py b/kedro_azureml/runner.py index 4cb6801..66fca90 100644 --- a/kedro_azureml/runner.py +++ b/kedro_azureml/runner.py @@ -47,39 +47,28 @@ def run( session_id: str = None, ) -> Dict[str, Any]: catalog = catalog.shallow_copy() - logger.error(f"0 catalog: {catalog}") + catalog_set = set(catalog.list()) # Loop over datasets in arguments to set their paths - logger.error(f"self.data_paths.items(): {self.data_paths.items()}") for ds_name, azure_dataset_path in self.data_paths.items(): - if ds_name in catalog: + if ds_name in catalog_set: ds = catalog._get_dataset(ds_name) - logger.error(f"0 ds: {ds}") if isinstance(ds, AzureMLPipelineDataset): if ( isinstance(ds, AzureMLAssetDataset) and ds._azureml_type == "uri_file" ): ds.root_dir = str(Path(azure_dataset_path).parent) - logger.error(f"1 ds: {ds}") else: ds.root_dir = azure_dataset_path - logger.error(f"2 ds: {ds}") catalog.add(ds_name, ds, replace=True) - logger.error(f"1 catalog: {catalog}") else: catalog.add(ds_name, self.create_default_data_set(ds_name)) - logger.error(f"2 catalog: {catalog}") # Loop over remaining input datasets to add them to the catalog - unsatisfied = [input for input in pipeline.inputs() if input not in catalog] - logger.error(f"3 catalog: {unsatisfied}") + unsatisfied = pipeline.inputs() - set(catalog.list()) for ds_name in unsatisfied: - default_ds = self.create_default_data_set(ds_name) - logger.error(f"0 default_ds: {default_ds}") - catalog.add(ds_name, default_ds) - - logger.error(f"4 catalog: {unsatisfied}") + catalog.add(ds_name, self.create_default_data_set(ds_name)) return super().run(pipeline, catalog, hook_manager, session_id) From fe9d343fc040ac3be8d01123f0e4845e90d0cbe4 Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Mon, 21 Oct 2024 10:36:40 -0400 Subject: [PATCH 11/12] Delete asf --- asf | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 asf diff --git a/asf b/asf deleted file mode 100644 index e67105f..0000000 --- a/asf +++ /dev/null @@ -1,14 +0,0 @@ -AzureMLDataset(dataset_config={'filepa - th': shuttles.xlsx, 'load_args': - {'engine': openpyxl}}, - dataset_type=ExcelDataset, - filepath_arg=filepath, - root_dir=/mnt/azureml/cr/j/5f0796d2547 - 44ea387fb7b4e5e27d202/cap/data-capabil - ity/wd/INPUT_shuttles) - - -PickleDataset(backend=cloudpickle, -filepath=/mnt/azureml/cr/j/19c6488f0e4644ce9a65e8a6932c8984/cap/data-capability/ -wd/INPUT_companies_csv/companies_csv.pickle, load_args={}, protocol=file, -save_args={}) \ No newline at end of file From 6a985b2e45881ad729ae5ce4532f1f19e40ac005 Mon Sep 17 00:00:00 2001 From: AlexandreOuellet Date: Mon, 21 Oct 2024 10:38:40 -0400 Subject: [PATCH 12/12] remove azure_config from hook's self, as it is no longer needed --- kedro_azureml/hooks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro_azureml/hooks.py b/kedro_azureml/hooks.py index 4b0075c..95a95ab 100644 --- a/kedro_azureml/hooks.py +++ b/kedro_azureml/hooks.py @@ -15,9 +15,9 @@ def after_context_created(self, context) -> None: {"azureml": ["azureml*", "azureml*/**", "**/azureml*"]} ) - self.azure_config = AzureMLConfig(**context.config_loader["azureml"]["azure"]) + azure_config = AzureMLConfig(**context.config_loader["azureml"]["azure"]) - azure_creds = {"azureml": self.azure_config.__dict__} + azure_creds = {"azureml": azure_config.__dict__} context.config_loader["credentials"] = { **context.config_loader["credentials"],