diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index 79bbac0..14ae3a9 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -717,5 +717,4 @@ jobs: echo "Processing $wheel" auditwheel show "$wheel" done - shell: bash - + shell: bash \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b825a27..c376735 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1410,7 +1410,7 @@ dependencies = [ [[package]] name = "pyfusion" -version = "1.3.4" +version = "1.4.0-dev0" dependencies = [ "bincode", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 89ef17e..37a6f14 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pyfusion" -version = "1.3.4" +version = "1.4.0-dev0" edition = "2021" @@ -60,4 +60,4 @@ harness = false [features] default = [] -experiments = [] +experiments = [] \ No newline at end of file diff --git a/py_src/fusion/__init__.py b/py_src/fusion/__init__.py index 63bd232..3901b80 100644 --- a/py_src/fusion/__init__.py +++ b/py_src/fusion/__init__.py @@ -2,7 +2,7 @@ __author__ = """Fusion Devs""" __email__ = "fusion_developers@jpmorgan.com" -__version__ = "1.3.4" +__version__ = "1.4.0-dev0" from fusion._fusion import FusionCredentials from fusion.fs_sync import fsync diff --git a/py_src/fusion/fusion.py b/py_src/fusion/fusion.py index 057ab80..25a7ba6 100644 --- a/py_src/fusion/fusion.py +++ b/py_src/fusion/fusion.py @@ -149,13 +149,19 @@ def __repr__(self) -> str: [ method_name for method_name in dir(Fusion) - if callable(getattr(Fusion, method_name)) and not method_name.startswith("_") + if callable(getattr(Fusion, method_name)) + and not method_name.startswith("_") ] - + [p for p in dir(Fusion) if isinstance(getattr(Fusion, p), property)], + + [ + p + for p in dir(Fusion) + if isinstance(getattr(Fusion, p), property) + ], [ getattr(Fusion, method_name).__doc__.split("\n")[0] for method_name in dir(Fusion) - if callable(getattr(Fusion, method_name)) and not method_name.startswith("_") + if callable(getattr(Fusion, method_name)) + and not method_name.startswith("_") ] + [ getattr(Fusion, p).__doc__.split("\n")[0] @@ -208,7 +214,9 @@ def get_fusion_filesystem(self) -> FusionHTTPFileSystem: Returns: Fusion Filesystem """ - return FusionHTTPFileSystem(client_kwargs={"root_url": self.root_url, "credentials": self.credentials}) + return FusionHTTPFileSystem( + client_kwargs={"root_url": self.root_url, "credentials": self.credentials} + ) def list_catalogs(self, output: bool = False) -> pd.DataFrame: """Lists the catalogs available to the API account. @@ -227,7 +235,9 @@ def list_catalogs(self, output: bool = False) -> pd.DataFrame: return cat_df - def catalog_resources(self, catalog: Optional[str] = None, output: bool = False) -> pd.DataFrame: + def catalog_resources( + self, catalog: Optional[str] = None, output: bool = False + ) -> pd.DataFrame: """List the resources contained within the catalog, for example products and datasets. Args: @@ -283,7 +293,9 @@ def list_products( if isinstance(contains, list): contains = "|".join(f"{s}" for s in contains) if id_contains: - filtered_df = full_prod_df[full_prod_df["identifier"].str.contains(contains, case=False)] + filtered_df = full_prod_df[ + full_prod_df["identifier"].str.contains(contains, case=False) + ] else: filtered_df = full_prod_df[ full_prod_df["identifier"].str.contains(contains, case=False) @@ -372,7 +384,9 @@ def list_datasets( # noqa: PLR0913 if isinstance(product, str) else prd_df[prd_df["product"].isin(product)] ) - ds_df = ds_df[ds_df["identifier"].str.lower().isin(prd_df["dataset"].str.lower())].reset_index(drop=True) + ds_df = ds_df[ + ds_df["identifier"].str.lower().isin(prd_df["dataset"].str.lower()) + ].reset_index(drop=True) if max_results > -1: ds_df = ds_df[0:max_results] @@ -402,7 +416,9 @@ def list_datasets( # noqa: PLR0913 return ds_df - def dataset_resources(self, dataset: str, catalog: Optional[str] = None, output: bool = False) -> pd.DataFrame: + def dataset_resources( + self, dataset: str, catalog: Optional[str] = None, output: bool = False + ) -> pd.DataFrame: """List the resources available for a dataset, currently this will always be a datasetseries. Args: @@ -445,7 +461,11 @@ def list_dataset_attributes( catalog = self._use_catalog(catalog) url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset}/attributes" - ds_attr_df = Fusion._call_for_dataframe(url, self.session).sort_values(by="index").reset_index(drop=True) + ds_attr_df = ( + Fusion._call_for_dataframe(url, self.session) + .sort_values(by="index") + .reset_index(drop=True) + ) if not display_all_columns: ds_attr_df = ds_attr_df[ @@ -499,7 +519,11 @@ def list_datasetmembers( return ds_members_df def datasetmember_resources( - self, dataset: str, series: str, catalog: Optional[str] = None, output: bool = False + self, + dataset: str, + series: str, + catalog: Optional[str] = None, + output: bool = False, ) -> pd.DataFrame: """List the available resources for a datasetseries member. @@ -524,7 +548,11 @@ def datasetmember_resources( return ds_mem_res_df def list_distributions( - self, dataset: str, series: str, catalog: Optional[str] = None, output: bool = False + self, + dataset: str, + series: str, + catalog: Optional[str] = None, + output: bool = False, ) -> pd.DataFrame: """List the available distributions (downloadable instances of the dataset with a format type). @@ -575,7 +603,9 @@ def _resolve_distro_tuples( datasetseries_list = self.list_datasetmembers(dataset, catalog) if len(datasetseries_list) == 0: - raise AssertionError(f"There are no dataset members for dataset {dataset} in catalog {catalog}") + raise AssertionError( + f"There are no dataset members for dataset {dataset} in catalog {catalog}" + ) if datasetseries_list.empty: raise APIResponseError( # pragma: no cover @@ -584,7 +614,9 @@ def _resolve_distro_tuples( ) if dt_str == "latest": - dt_str = datasetseries_list.iloc[datasetseries_list["createdDate"].to_numpy().argmax()]["identifier"] + dt_str = datasetseries_list.iloc[ + datasetseries_list["createdDate"].to_numpy().argmax() + ]["identifier"] parsed_dates = normalise_dt_param_str(dt_str) if len(parsed_dates) == 1: @@ -592,13 +624,23 @@ def _resolve_distro_tuples( if parsed_dates[0]: datasetseries_list = datasetseries_list[ - pd.Series([pd.to_datetime(i, errors="coerce") for i in datasetseries_list["identifier"]]) + pd.Series( + [ + pd.to_datetime(i, errors="coerce") + for i in datasetseries_list["identifier"] + ] + ) >= pd.to_datetime(parsed_dates[0]) ].reset_index() if parsed_dates[1]: datasetseries_list = datasetseries_list[ - pd.Series([pd.to_datetime(i, errors="coerce") for i in datasetseries_list["identifier"]]) + pd.Series( + [ + pd.to_datetime(i, errors="coerce") + for i in datasetseries_list["identifier"] + ] + ) <= pd.to_datetime(parsed_dates[1]) ].reset_index() @@ -609,7 +651,9 @@ def _resolve_distro_tuples( ) required_series = list(datasetseries_list["@id"]) - tups = [(catalog, dataset, series, dataset_format) for series in required_series] + tups = [ + (catalog, dataset, series, dataset_format) for series in required_series + ] return tups @@ -652,14 +696,20 @@ def download( # noqa: PLR0912, PLR0913 """ catalog = self._use_catalog(catalog) - valid_date_range = re.compile(r"^(\d{4}\d{2}\d{2})$|^((\d{4}\d{2}\d{2})?([:])(\d{4}\d{2}\d{2})?)$") + valid_date_range = re.compile( + r"^(\d{4}\d{2}\d{2})$|^((\d{4}\d{2}\d{2})?([:])(\d{4}\d{2}\d{2})?)$" + ) if valid_date_range.match(dt_str) or dt_str == "latest": - required_series = self._resolve_distro_tuples(dataset, dt_str, dataset_format, catalog) + required_series = self._resolve_distro_tuples( + dataset, dt_str, dataset_format, catalog + ) else: # sample data is limited to csv if dt_str == "sample": - dataset_format = self.list_distributions(dataset, dt_str, catalog)["identifier"].iloc[0] + dataset_format = self.list_distributions(dataset, dt_str, catalog)[ + "identifier" + ].iloc[0] required_series = [(catalog, dataset, dt_str, dataset_format)] if dataset_format not in RECOGNIZED_FORMATS + ["raw"]: @@ -714,17 +764,21 @@ def download( # noqa: PLR0912, PLR0913 if show_progress: with joblib_progress("Downloading", total=len(download_spec)): res = Parallel(n_jobs=n_par)( - delayed(self.get_fusion_filesystem().download)(**spec) for spec in download_spec + delayed(self.get_fusion_filesystem().download)(**spec) + for spec in download_spec ) else: res = Parallel(n_jobs=n_par)( - delayed(self.get_fusion_filesystem().download)(**spec) for spec in download_spec + delayed(self.get_fusion_filesystem().download)(**spec) + for spec in download_spec ) if (len(res) > 0) and (not all(r[0] for r in res)): for r in res: if not r[0]: - warnings.warn(f"The download of {r[1]} was not successful", stacklevel=2) + warnings.warn( + f"The download of {r[1]} was not successful", stacklevel=2 + ) return res if return_paths else None def to_df( # noqa: PLR0913 @@ -793,7 +847,9 @@ def to_df( # noqa: PLR0913 ) if not download_res: - raise ValueError("Must specify 'return_paths=True' in download call to use this function") + raise ValueError( + "Must specify 'return_paths=True' in download call to use this function" + ) if not all(res[0] for res in download_res): failed_res = [res for res in download_res if not res[0]] @@ -844,7 +900,9 @@ def to_df( # noqa: PLR0913 pd_reader = pd_read_fn_map.get(dataset_format) pd_read_kwargs = pd_read_default_kwargs.get(dataset_format, {}) if not pd_reader: - raise Exception(f"No pandas function to read file in format {dataset_format}") + raise Exception( + f"No pandas function to read file in format {dataset_format}" + ) pd_read_kwargs.update(kwargs) @@ -963,7 +1021,9 @@ def to_table( # noqa: PLR0913 ) if not download_res: - raise ValueError("Must specify 'return_paths=True' in download call to use this function") + raise ValueError( + "Must specify 'return_paths=True' in download call to use this function" + ) if not all(res[0] for res in download_res): failed_res = [res for res in download_res if not res[0]] @@ -1063,23 +1123,37 @@ def upload( # noqa: PLR0913 if self.fs.info(path)["type"] == "directory": file_path_lst = self.fs.find(path) local_file_validation = validate_file_names(file_path_lst, fs_fusion) - file_path_lst = [f for flag, f in zip(local_file_validation, file_path_lst) if flag] + file_path_lst = [ + f for flag, f in zip(local_file_validation, file_path_lst) if flag + ] file_name = [f.split("/")[-1] for f in file_path_lst] is_raw_lst = is_dataset_raw(file_path_lst, fs_fusion) - local_url_eqiv = [path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst)] + local_url_eqiv = [ + path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst) + ] else: file_path_lst = [path] if not catalog or not dataset: local_file_validation = validate_file_names(file_path_lst, fs_fusion) - file_path_lst = [f for flag, f in zip(local_file_validation, file_path_lst) if flag] + file_path_lst = [ + f for flag, f in zip(local_file_validation, file_path_lst) if flag + ] is_raw_lst = is_dataset_raw(file_path_lst, fs_fusion) - local_url_eqiv = [path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst)] + local_url_eqiv = [ + path_to_url(i, r) for i, r in zip(file_path_lst, is_raw_lst) + ] if preserve_original_name: - raise ValueError("preserve_original_name can only be used when catalog and dataset are provided.") + raise ValueError( + "preserve_original_name can only be used when catalog and dataset are provided." + ) else: date_identifier = re.compile(r"^(\d{4})(\d{2})(\d{2})$") if date_identifier.match(dt_str): - dt_str = dt_str if dt_str != "latest" else pd.Timestamp("today").date().strftime("%Y%m%d") + dt_str = ( + dt_str + if dt_str != "latest" + else pd.Timestamp("today").date().strftime("%Y%m%d") + ) dt_str = pd.Timestamp(dt_str).date().strftime("%Y%m%d") if catalog not in fs_fusion.ls("") or dataset not in [ @@ -1093,10 +1167,16 @@ def upload( # noqa: PLR0913 return [(False, path, msg)] file_format = path.split(".")[-1] file_name = [path.split("/")[-1]] - file_format = "raw" if file_format not in RECOGNIZED_FORMATS else file_format + file_format = ( + "raw" if file_format not in RECOGNIZED_FORMATS else file_format + ) local_url_eqiv = [ - "/".join(distribution_to_url("", dataset, dt_str, file_format, catalog, False).split("/")[1:]) + "/".join( + distribution_to_url( + "", dataset, dt_str, file_format, catalog, False + ).split("/")[1:] + ) ] if not preserve_original_name: @@ -1133,7 +1213,7 @@ def upload( # noqa: PLR0913 def from_bytes( # noqa: PLR0913 self, data: BytesIO, - dataset: Optional[str] = None, + dataset: str, series_member: str = "latest", catalog: Optional[str] = None, distribution: str = "parquet", @@ -1149,8 +1229,7 @@ def from_bytes( # noqa: PLR0913 Args: data (str): an object in memory to upload - dataset (str, optional): Dataset name to which the file will be uploaded (for single file only). - If not provided the dataset will be implied from file's name. + dataset (str): Dataset name to which the bytes will be uploaded. series_member (str, optional): A single date or label. Defaults to 'latest' which will return the most recent. catalog (str, optional): A catalog identifier. Defaults to 'common'. @@ -1164,7 +1243,7 @@ def from_bytes( # noqa: PLR0913 file_name (str, optional): file name to be used for the uploaded file. Defaults to Fusion standard naming. Returns: - + Optional[list[tuple[bool, str, Optional[str]]]: a list of tuples, one for each distribution """ catalog = self._use_catalog(catalog) @@ -1174,7 +1253,9 @@ def from_bytes( # noqa: PLR0913 raise ValueError(f"Dataset format {distribution} is not supported") is_raw = js.loads(fs_fusion.cat(f"{catalog}/datasets/{dataset}"))["isRawData"] - local_url_eqiv = path_to_url(f"{dataset}__{catalog}__{series_member}.{distribution}", is_raw) + local_url_eqiv = path_to_url( + f"{dataset}__{catalog}__{series_member}.{distribution}", is_raw + ) data_map_df = pd.DataFrame(["", local_url_eqiv, file_name]).T data_map_df.columns = ["path", "url", "file_name"] # type: ignore @@ -1249,7 +1330,9 @@ async def async_events() -> None: if self.events is None: self.events = pd.DataFrame() else: - self.events = pd.concat([self.events, pd.DataFrame(event)], ignore_index=True) + self.events = pd.concat( + [self.events, pd.DataFrame(event)], ignore_index=True + ) except TimeoutError as ex: raise ex from None except BaseException: @@ -1257,7 +1340,9 @@ async def async_events() -> None: _ = self.list_catalogs() # refresh token if "headers" in kwargs: - kwargs["headers"].update({"authorization": f"bearer {self.credentials.bearer_token}"}) + kwargs["headers"].update( + {"authorization": f"bearer {self.credentials.bearer_token}"} + ) else: kwargs["headers"] = { "authorization": f"bearer {self.credentials.bearer_token}", @@ -1316,3 +1401,149 @@ def get_events( return None # noqa: B012, SIM107 else: return self.events + + def list_dataset_lineage( + self, + dataset_id: str, + catalog: Optional[str] = None, + output: bool = False, + max_results: int = -1, + ) -> pd.DataFrame: + """List the upstream and downstream lineage of the dataset. + + Args: + dataset (str): A dataset identifier + catalog (str, optional): A catalog identifier. Defaults to 'common'. + output (bool, optional): If True then print the dataframe. Defaults to False. + max_results (int, optional): Limit the number of rows returned in the dataframe. + Defaults to -1 which returns all results. + + Returns: + class:`pandas.DataFrame`: A dataframe with a row for each resource + + Raises: + HTTPError: If the dataset is not found in the catalog. + + """ + catalog = self._use_catalog(catalog) + + url_dataset = f"{self.root_url}catalogs/{catalog}/datasets/{dataset_id}" + resp_dataset = self.session.get(url_dataset) + resp_dataset.raise_for_status() + + url = f"{self.root_url}catalogs/{catalog}/datasets/{dataset_id}/lineage" + resp = self.session.get(url) + data = resp.json() + relations_data = data["relations"] + + restricted_datasets = [ + dataset_metadata["identifier"] + for dataset_metadata in data["datasets"] + if dataset_metadata.get("status", None) == "Restricted" + ] + + data_dict = {} + + for entry in relations_data: + source_dataset_id = entry["source"]["dataset"] + source_catalog = entry["source"]["catalog"] + destination_dataset_id = entry["destination"]["dataset"] + destination_catalog = entry["destination"]["catalog"] + + if destination_dataset_id == dataset_id: + for dataset in data["datasets"]: + if dataset["identifier"] == source_dataset_id and dataset.get("status", None) != "Restricted": + source_dataset_title = dataset["title"] + elif dataset["identifier"] == source_dataset_id and dataset.get("status", None) == "Restricted": + source_dataset_title = "Access Restricted" + data_dict[source_dataset_id] = ("source", source_catalog, source_dataset_title) + + if source_dataset_id == dataset_id: + for dataset in data["datasets"]: + if dataset["identifier"] == destination_dataset_id and dataset.get("status", None) != "Restricted": + destination_dataset_title = dataset["title"] + elif dataset[ + "identifier" + ] == destination_dataset_id and dataset.get("status", None) == "Restricted": + destination_dataset_title = "Access Restricted" + data_dict[destination_dataset_id] = ("produced", destination_catalog, destination_dataset_title) + + output_data = { + "type": [v[0] for v in data_dict.values()], + "dataset_identifier": list(data_dict.keys()), + "title": [v[2] for v in data_dict.values()], + "catalog": [v[1] for v in data_dict.values()], + } + + lineage_df = pd.DataFrame(output_data) + lineage_df.loc[ + lineage_df["dataset_identifier"].isin(restricted_datasets), + ["dataset_identifier", "catalog", "title"], + ] = "Access Restricted" + + if max_results > -1: + lineage_df = lineage_df[0:max_results] + + if output: + pass + + return lineage_df + + def create_dataset_lineage( + self, + base_dataset: str, + source_dataset_catalog_mapping: Union[pd.DataFrame, list[dict[str, str]]], + catalog: Optional[str] = None, + return_resp_obj: bool = False, + ) -> Optional[requests.Response]: + """Upload lineage to a dataset. + + Args: + base_dataset (str): A dataset identifier to which you want to add lineage. + source_dataset_catalog_mapping (Union[pd.DataFrame, list[dict[str]]]): Mapping for the dataset + identifier(s) and catalog(s) from which to add lineage. + catalog (Optional[str], optional): Catalog identifier. Defaults to None. + return_resp_obj (bool, optional): If True then return the response object. Defaults to False. + + Raises: + ValueError: If source_dataset_catalog_mapping is not a pandas DataFrame or a list of dictionaries + HTTPError: If the request is unsuccessful. + + Examples: + Creating lineage from a pandas DataFrame. + >>> data = [{"dataset": "a", "catalog": "a"}, {"dataset": "b", "catalog": "b"}] + >>> df = pd.DataFrame(data) + >>> fusion = Fusion() + >>> fusion.create_dataset_lineage(base_dataset="c", source_dataset_catalog_mapping=df, catalog="c") + + Creating lineage from a list of dictionaries. + >>> data = [{"dataset": "a", "catalog": "a"}, {"dataset": "b", "catalog": "b"}] + >>> fusion = Fusion() + >>> fusion.create_dataset_lineage(base_dataset="c", source_dataset_catalog_mapping=data, catalog="c") + + """ + catalog = self._use_catalog(catalog) + + if isinstance(source_dataset_catalog_mapping, pd.DataFrame): + dataset_mapping_list = [ + { + "dataset": row["dataset"], + "catalog": row["catalog"] + } for _, row in source_dataset_catalog_mapping.iterrows() + ] + elif isinstance(source_dataset_catalog_mapping, list): + dataset_mapping_list = source_dataset_catalog_mapping + else: + raise ValueError("source_dataset_catalog_mapping must be a pandas DataFrame or a list of dictionaries.") + data = { + "source": dataset_mapping_list + } + + url = f"{self.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage" + + resp = self.session.post(url, json=data) + + resp.raise_for_status() + + return resp if return_resp_obj else None + diff --git a/py_tests/conftest.py b/py_tests/conftest.py index bd1b68d..65ec926 100644 --- a/py_tests/conftest.py +++ b/py_tests/conftest.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, Union +import polars as pl import pytest from fusion._fusion import FusionCredentials @@ -161,3 +162,20 @@ def fusion_oauth_adapter(credentials: FusionCredentials) -> FusionOAuthAdapter: def fusion_obj(credentials: FusionCredentials) -> Fusion: fusion = Fusion(credentials=credentials) return fusion + + +@pytest.fixture() +def data_table() -> pl.DataFrame: + return pl.DataFrame( + {"col_1": range(10), "col_2": [str(x) for x in range(10)], "col_3": [x / 3.14159 for x in range(10)]} + ) + + +@pytest.fixture() +def data_table_as_csv(data_table: pl.DataFrame) -> str: + return data_table.write_csv(None) + + +@pytest.fixture() +def data_table_as_json(data_table: pl.DataFrame) -> str: + return data_table.write_json(None) diff --git a/py_tests/test_fusion.py b/py_tests/test_fusion.py index 3a01e5b..750ff54 100644 --- a/py_tests/test_fusion.py +++ b/py_tests/test_fusion.py @@ -4,7 +4,6 @@ from typing import Any import pandas as pd -import polars as pl import pytest import requests import requests_mock @@ -88,7 +87,6 @@ def test_res_plural(ref_int: int, pluraliser: str) -> None: def test_is_url() -> None: from fusion.authentication import _is_url - assert _is_url("https://www.google.com") assert _is_url("http://www.google.com/some/path?qp1=1&qp2=2") assert not _is_url("www.google.com") @@ -626,23 +624,6 @@ def test_download_main(mocker: MockerFixture, fusion_obj: Fusion) -> None: assert "sample" in res[0][1] -@pytest.fixture() -def data_table() -> pl.DataFrame: - return pl.DataFrame( - {"col_1": range(10), "col_2": [str(x) for x in range(10)], "col_3": [x / 3.14159 for x in range(10)]} - ) - - -@pytest.fixture() -def data_table_as_csv(data_table: pl.DataFrame) -> str: - return data_table.write_csv(None) - - -@pytest.fixture() -def data_table_as_json(data_table: pl.DataFrame) -> str: - return data_table.write_json(None) - - def test_to_df(mocker: MockerFixture, tmp_path: Path, data_table_as_csv: str, fusion_obj: Fusion) -> None: catalog = "my_catalog" dataset = "my_dataset" @@ -686,3 +667,215 @@ def test_to_table(mocker: MockerFixture, tmp_path: Path, data_table_as_csv: str, res = fusion_obj.to_table(dataset, f"{dates[0]}:{dates[-1]}", fmt, catalog=catalog) assert len(res) > 0 + + +def test_list_dataset_lineage(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + dataset = "dataset_id" + catalog = "catalog_id" + url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}" + requests_mock.get(url_dataset, status_code=200) + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}/lineage" + expected_data = { + "relations": [ + { + "source": {"dataset": "source_dataset", "catalog": "source_catalog"}, + "destination": {"dataset": dataset, "catalog": catalog}, + }, + { + "source": {"dataset": dataset, "catalog": catalog}, + "destination": {"dataset": "destination_dataset", "catalog": "destination_catalog"}, + }, + ], + "datasets": [ + {"identifier": "source_dataset", "title": "Source Dataset"}, + {"identifier": "destination_dataset", "status": "Active", "title": "Destination Dataset"}, + ], + } + requests_mock.get(url, json=expected_data) + + # Call the list_dataset_lineage method + test_df = fusion_obj.list_dataset_lineage(dataset, catalog=catalog) + + # Check if the dataframe is created correctly + expected_df = pd.DataFrame( + { + "type": ["source", "produced"], + "dataset_identifier": ["source_dataset", "destination_dataset"], + "title": ["Source Dataset", "Destination Dataset"], + "catalog": ["source_catalog", "destination_catalog"], + } + ) + pd.testing.assert_frame_equal(test_df, expected_df) + + +def test_list_dataset_lineage_max_results(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + dataset = "dataset_id" + catalog = "catalog_id" + url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}" + requests_mock.get(url_dataset, status_code=200) + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}/lineage" + expected_data = { + "relations": [ + { + "source": {"dataset": "source_dataset", "catalog": "source_catalog"}, + "destination": {"dataset": dataset, "catalog": catalog}, + }, + { + "source": {"dataset": dataset, "catalog": catalog}, + "destination": {"dataset": "destination_dataset", "catalog": "destination_catalog"}, + }, + ], + "datasets": [ + {"identifier": "source_dataset", "status": "Active", "title": "Source Dataset"}, + {"identifier": "destination_dataset", "status": "Active", "title": "Destination Dataset"}, + ], + } + requests_mock.get(url, json=expected_data) + + # Call the list_dataset_lineage method + test_df = fusion_obj.list_dataset_lineage(dataset, catalog=catalog, max_results=1) + + # Check if the dataframe is created correctly + assert len(test_df) == 1 + + +def test_list_dataset_lineage_resticted(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + dataset_id = "dataset_id" + catalog = "catalog_id" + url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset_id}" + requests_mock.get(url_dataset, status_code=200) + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset_id}/lineage" + + expected_data = { + "relations": [ + { + "source": {"dataset": "source_dataset", "catalog": "source_catalog"}, + "destination": {"dataset": dataset_id, "catalog": catalog}, + }, + { + "source": {"dataset": dataset_id, "catalog": catalog}, + "destination": {"dataset": "destination_dataset", "catalog": "destination_catalog"}, + }, + ], + "datasets": [ + {"identifier": "source_dataset", "status": "Restricted"}, + {"identifier": "destination_dataset", "status": "Active", "title": "Destination Dataset"}, + ], + } + requests_mock.get(url, json=expected_data) + + # Call the list_dataset_lineage method + test_df = fusion_obj.list_dataset_lineage(dataset_id, catalog=catalog) + + # Check if the dataframe is created correctly + expected_df = pd.DataFrame( + { + "type": ["source", "produced"], + "dataset_identifier": ["Access Restricted", "destination_dataset"], + "title": ["Access Restricted", "Destination Dataset"], + "catalog": ["Access Restricted", "destination_catalog"], + } + ) + pd.testing.assert_frame_equal(test_df, expected_df) + + +def test_list_dataset_lineage_dataset_not_found(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + dataset_id = "dataset_id" + catalog = "catalog_id" + url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset_id}" + requests_mock.get(url_dataset, status_code=404) + + with pytest.raises(requests.exceptions.HTTPError): + fusion_obj.list_dataset_lineage(dataset_id, catalog=catalog) + + +def test_create_dataset_lineage_from_df(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + base_dataset = "base_dataset" + source_dataset = "source_dataset" + source_dataset_catalog = "source_catalog" + catalog = "common" + status_code = 200 + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage" + expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]} + requests_mock.post(url, json=expected_data) + + data = [{"dataset": "source_dataset", "catalog": "source_catalog"}] + df_input = pd.DataFrame(data) + + # Call the create_dataset_lineage method + resp = fusion_obj.create_dataset_lineage( + base_dataset=base_dataset, + source_dataset_catalog_mapping=df_input, + catalog=catalog, + return_resp_obj=True + ) + + # Check if the response is correct + assert resp is not None + if resp is not None: + assert resp.status_code == status_code + + +def test_create_dataset_lineage_from_list(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + base_dataset = "base_dataset" + source_dataset = "source_dataset" + source_dataset_catalog = "source_catalog" + catalog = "common" + status_code = 200 + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage" + expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]} + requests_mock.post(url, json=expected_data) + + data = [{"dataset": "source_dataset", "catalog": "source_catalog"}] + + # Call the create_dataset_lineage method + resp = fusion_obj.create_dataset_lineage( + base_dataset=base_dataset, + source_dataset_catalog_mapping=data, + catalog=catalog, + return_resp_obj=True + ) + + # Check if the response is correct + assert resp is not None + if resp is not None: + assert resp.status_code == status_code + + +def test_create_dataset_lineage_valueerror(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + base_dataset = "base_dataset" + source_dataset = "source_dataset" + source_dataset_catalog = "source_catalog" + catalog = "common" + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage" + expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]} + requests_mock.post(url, json=expected_data) + + data = {"dataset": "source_dataset", "catalog": "source_catalog"} + + with pytest.raises( + ValueError, match="source_dataset_catalog_mapping must be a pandas DataFrame or a list of dictionaries." + ): + fusion_obj.create_dataset_lineage( + base_dataset=base_dataset, + source_dataset_catalog_mapping=data, # type: ignore + catalog=catalog + ) + + +def test_create_dataset_lineage_httperror(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None: + base_dataset = "base_dataset" + source_dataset = "source_dataset" + source_dataset_catalog = "source_catalog" + catalog = "common" + url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage" + expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]} + data = [{"dataset": "source_dataset", "catalog": "source_catalog"}] + requests_mock.post(url, status_code=500, json=expected_data) + + with pytest.raises(requests.exceptions.HTTPError): + fusion_obj.create_dataset_lineage( + base_dataset=base_dataset, + source_dataset_catalog_mapping=data, + catalog=catalog + ) diff --git a/pyproject.toml b/pyproject.toml index daddc06..be930ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "pyfusion" -version = "1.3.4" +version = "1.4.0-dev0" homepage = "https://github.com/jpmorganchase/fusion" description = "JPMC Fusion Developer Tools" @@ -224,7 +224,7 @@ omit = [ [tool.bumpversion] -current_version = "1.3.4-dev0" +current_version = "1.4.0-dev0" parse = '(?P\d+)\.(?P\d+)\.(?P\d+)(?:-(?P[a-z]+)(?P\d+))?' serialize = [ '{major}.{minor}.{patch}-{release}{candidate}', diff --git a/src/auth.rs b/src/auth.rs index d9758f6..5107659 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -285,7 +285,6 @@ impl Default for AuthToken { } fn build_client(proxies: &Option>) -> PyResult { - client_builder_from_proxies(proxies.as_ref().unwrap_or(&HashMap::new())) .use_rustls_tls() .tls_built_in_native_certs(true)