Skip to content

Commit

Permalink
Feature/dataset lineage (#75)
Browse files Browse the repository at this point in the history
Dataset lineage creation and exploration
  • Loading branch information
georgiannajames authored Oct 11, 2024
1 parent fc7cccf commit 02e7733
Show file tree
Hide file tree
Showing 9 changed files with 508 additions and 68 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -717,5 +717,4 @@ jobs:
echo "Processing $wheel"
auditwheel show "$wheel"
done
shell: bash

shell: bash
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyfusion"
version = "1.3.4"
version = "1.4.0-dev0"
edition = "2021"


Expand Down Expand Up @@ -60,4 +60,4 @@ harness = false

[features]
default = []
experiments = []
experiments = []
2 changes: 1 addition & 1 deletion py_src/fusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

__author__ = """Fusion Devs"""
__email__ = "[email protected]"
__version__ = "1.3.4"
__version__ = "1.4.0-dev0"

from fusion._fusion import FusionCredentials
from fusion.fs_sync import fsync
Expand Down
311 changes: 271 additions & 40 deletions py_src/fusion/fusion.py

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions py_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Any, Union

import polars as pl
import pytest

from fusion._fusion import FusionCredentials
Expand Down Expand Up @@ -161,3 +162,20 @@ def fusion_oauth_adapter(credentials: FusionCredentials) -> FusionOAuthAdapter:
def fusion_obj(credentials: FusionCredentials) -> Fusion:
fusion = Fusion(credentials=credentials)
return fusion


@pytest.fixture()
def data_table() -> pl.DataFrame:
return pl.DataFrame(
{"col_1": range(10), "col_2": [str(x) for x in range(10)], "col_3": [x / 3.14159 for x in range(10)]}
)


@pytest.fixture()
def data_table_as_csv(data_table: pl.DataFrame) -> str:
return data_table.write_csv(None)


@pytest.fixture()
def data_table_as_json(data_table: pl.DataFrame) -> str:
return data_table.write_json(None)
231 changes: 212 additions & 19 deletions py_tests/test_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Any

import pandas as pd
import polars as pl
import pytest
import requests
import requests_mock
Expand Down Expand Up @@ -88,7 +87,6 @@ def test_res_plural(ref_int: int, pluraliser: str) -> None:

def test_is_url() -> None:
from fusion.authentication import _is_url

assert _is_url("https://www.google.com")
assert _is_url("http://www.google.com/some/path?qp1=1&qp2=2")
assert not _is_url("www.google.com")
Expand Down Expand Up @@ -626,23 +624,6 @@ def test_download_main(mocker: MockerFixture, fusion_obj: Fusion) -> None:
assert "sample" in res[0][1]


@pytest.fixture()
def data_table() -> pl.DataFrame:
return pl.DataFrame(
{"col_1": range(10), "col_2": [str(x) for x in range(10)], "col_3": [x / 3.14159 for x in range(10)]}
)


@pytest.fixture()
def data_table_as_csv(data_table: pl.DataFrame) -> str:
return data_table.write_csv(None)


@pytest.fixture()
def data_table_as_json(data_table: pl.DataFrame) -> str:
return data_table.write_json(None)


def test_to_df(mocker: MockerFixture, tmp_path: Path, data_table_as_csv: str, fusion_obj: Fusion) -> None:
catalog = "my_catalog"
dataset = "my_dataset"
Expand Down Expand Up @@ -686,3 +667,215 @@ def test_to_table(mocker: MockerFixture, tmp_path: Path, data_table_as_csv: str,

res = fusion_obj.to_table(dataset, f"{dates[0]}:{dates[-1]}", fmt, catalog=catalog)
assert len(res) > 0


def test_list_dataset_lineage(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
dataset = "dataset_id"
catalog = "catalog_id"
url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}"
requests_mock.get(url_dataset, status_code=200)
url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}/lineage"
expected_data = {
"relations": [
{
"source": {"dataset": "source_dataset", "catalog": "source_catalog"},
"destination": {"dataset": dataset, "catalog": catalog},
},
{
"source": {"dataset": dataset, "catalog": catalog},
"destination": {"dataset": "destination_dataset", "catalog": "destination_catalog"},
},
],
"datasets": [
{"identifier": "source_dataset", "title": "Source Dataset"},
{"identifier": "destination_dataset", "status": "Active", "title": "Destination Dataset"},
],
}
requests_mock.get(url, json=expected_data)

# Call the list_dataset_lineage method
test_df = fusion_obj.list_dataset_lineage(dataset, catalog=catalog)

# Check if the dataframe is created correctly
expected_df = pd.DataFrame(
{
"type": ["source", "produced"],
"dataset_identifier": ["source_dataset", "destination_dataset"],
"title": ["Source Dataset", "Destination Dataset"],
"catalog": ["source_catalog", "destination_catalog"],
}
)
pd.testing.assert_frame_equal(test_df, expected_df)


def test_list_dataset_lineage_max_results(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
dataset = "dataset_id"
catalog = "catalog_id"
url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}"
requests_mock.get(url_dataset, status_code=200)
url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset}/lineage"
expected_data = {
"relations": [
{
"source": {"dataset": "source_dataset", "catalog": "source_catalog"},
"destination": {"dataset": dataset, "catalog": catalog},
},
{
"source": {"dataset": dataset, "catalog": catalog},
"destination": {"dataset": "destination_dataset", "catalog": "destination_catalog"},
},
],
"datasets": [
{"identifier": "source_dataset", "status": "Active", "title": "Source Dataset"},
{"identifier": "destination_dataset", "status": "Active", "title": "Destination Dataset"},
],
}
requests_mock.get(url, json=expected_data)

# Call the list_dataset_lineage method
test_df = fusion_obj.list_dataset_lineage(dataset, catalog=catalog, max_results=1)

# Check if the dataframe is created correctly
assert len(test_df) == 1


def test_list_dataset_lineage_resticted(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
dataset_id = "dataset_id"
catalog = "catalog_id"
url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset_id}"
requests_mock.get(url_dataset, status_code=200)
url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset_id}/lineage"

expected_data = {
"relations": [
{
"source": {"dataset": "source_dataset", "catalog": "source_catalog"},
"destination": {"dataset": dataset_id, "catalog": catalog},
},
{
"source": {"dataset": dataset_id, "catalog": catalog},
"destination": {"dataset": "destination_dataset", "catalog": "destination_catalog"},
},
],
"datasets": [
{"identifier": "source_dataset", "status": "Restricted"},
{"identifier": "destination_dataset", "status": "Active", "title": "Destination Dataset"},
],
}
requests_mock.get(url, json=expected_data)

# Call the list_dataset_lineage method
test_df = fusion_obj.list_dataset_lineage(dataset_id, catalog=catalog)

# Check if the dataframe is created correctly
expected_df = pd.DataFrame(
{
"type": ["source", "produced"],
"dataset_identifier": ["Access Restricted", "destination_dataset"],
"title": ["Access Restricted", "Destination Dataset"],
"catalog": ["Access Restricted", "destination_catalog"],
}
)
pd.testing.assert_frame_equal(test_df, expected_df)


def test_list_dataset_lineage_dataset_not_found(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
dataset_id = "dataset_id"
catalog = "catalog_id"
url_dataset = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{dataset_id}"
requests_mock.get(url_dataset, status_code=404)

with pytest.raises(requests.exceptions.HTTPError):
fusion_obj.list_dataset_lineage(dataset_id, catalog=catalog)


def test_create_dataset_lineage_from_df(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
base_dataset = "base_dataset"
source_dataset = "source_dataset"
source_dataset_catalog = "source_catalog"
catalog = "common"
status_code = 200
url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage"
expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]}
requests_mock.post(url, json=expected_data)

data = [{"dataset": "source_dataset", "catalog": "source_catalog"}]
df_input = pd.DataFrame(data)

# Call the create_dataset_lineage method
resp = fusion_obj.create_dataset_lineage(
base_dataset=base_dataset,
source_dataset_catalog_mapping=df_input,
catalog=catalog,
return_resp_obj=True
)

# Check if the response is correct
assert resp is not None
if resp is not None:
assert resp.status_code == status_code


def test_create_dataset_lineage_from_list(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
base_dataset = "base_dataset"
source_dataset = "source_dataset"
source_dataset_catalog = "source_catalog"
catalog = "common"
status_code = 200
url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage"
expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]}
requests_mock.post(url, json=expected_data)

data = [{"dataset": "source_dataset", "catalog": "source_catalog"}]

# Call the create_dataset_lineage method
resp = fusion_obj.create_dataset_lineage(
base_dataset=base_dataset,
source_dataset_catalog_mapping=data,
catalog=catalog,
return_resp_obj=True
)

# Check if the response is correct
assert resp is not None
if resp is not None:
assert resp.status_code == status_code


def test_create_dataset_lineage_valueerror(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
base_dataset = "base_dataset"
source_dataset = "source_dataset"
source_dataset_catalog = "source_catalog"
catalog = "common"
url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage"
expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]}
requests_mock.post(url, json=expected_data)

data = {"dataset": "source_dataset", "catalog": "source_catalog"}

with pytest.raises(
ValueError, match="source_dataset_catalog_mapping must be a pandas DataFrame or a list of dictionaries."
):
fusion_obj.create_dataset_lineage(
base_dataset=base_dataset,
source_dataset_catalog_mapping=data, # type: ignore
catalog=catalog
)


def test_create_dataset_lineage_httperror(requests_mock: requests_mock.Mocker, fusion_obj: Fusion) -> None:
base_dataset = "base_dataset"
source_dataset = "source_dataset"
source_dataset_catalog = "source_catalog"
catalog = "common"
url = f"{fusion_obj.root_url}catalogs/{catalog}/datasets/{base_dataset}/lineage"
expected_data = {"source": [{"dataset": source_dataset, "catalog": source_dataset_catalog}]}
data = [{"dataset": "source_dataset", "catalog": "source_catalog"}]
requests_mock.post(url, status_code=500, json=expected_data)

with pytest.raises(requests.exceptions.HTTPError):
fusion_obj.create_dataset_lineage(
base_dataset=base_dataset,
source_dataset_catalog_mapping=data,
catalog=catalog
)
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pyfusion"
version = "1.3.4"
version = "1.4.0-dev0"

homepage = "https://github.com/jpmorganchase/fusion"
description = "JPMC Fusion Developer Tools"
Expand Down Expand Up @@ -224,7 +224,7 @@ omit = [


[tool.bumpversion]
current_version = "1.3.4-dev0"
current_version = "1.4.0-dev0"
parse = '(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?:-(?P<release>[a-z]+)(?P<candidate>\d+))?'
serialize = [
'{major}.{minor}.{patch}-{release}{candidate}',
Expand Down
1 change: 0 additions & 1 deletion src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ impl Default for AuthToken {
}

fn build_client(proxies: &Option<HashMap<String, String>>) -> PyResult<reqwest::Client> {

client_builder_from_proxies(proxies.as_ref().unwrap_or(&HashMap::new()))
.use_rustls_tls()
.tls_built_in_native_certs(true)
Expand Down

0 comments on commit 02e7733

Please sign in to comment.