Skip to content

Commit

Permalink
Merge pull request #750 from dyvenia/improve_transform_and_catalog_flow
Browse files Browse the repository at this point in the history
✨ Modify `TransformAndCatalog` flow
  • Loading branch information
Rafalz13 authored Oct 12, 2023
2 parents c7d917d + aab3323 commit 39cfc03
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

### Changed
- Changed the flow name from `TransformAndCatalog` to `TransformAndCatalogToLuma`.
- Modified `add_viadot_metadata_columns` to be able to apply a parameter source_name to the decorator for to_df funtion or function where the DataFrame is generated.
- Changed `SharepointToDF` task in order to implement add_viadot_metadata_columns with value `source_name="Sharepoint"` after changes.
- Changed, `Mindful` credentials passed by the `auth` parameter, instead of by the `header`.


## [0.4.19] - 2023-08-31
### Added
- Added `add_viadot_metadata_columns` function that will be used as a decorator for `to_df` class methods.
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ avro-python3==1.10.2
pygit2>=1.10.1, <1.11.0
dbt-core==1.3.2
dbt-sqlserver==1.3.1
lumaCLI==0.0.18
Office365-REST-Python-Client==2.4.4
lumaCLI==0.0.19
Office365-REST-Python-Client==2.4.4
2 changes: 1 addition & 1 deletion viadot/flows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@
from .sql_server_to_duckdb import SQLServerToDuckDB
from .sql_server_to_parquet import SQLServerToParquet
from .sql_server_transform import SQLServerTransform
from .transform_and_catalog import TransformAndCatalog
from .transform_and_catalog import TransformAndCatalogToLuma
from .vid_club_to_adls import VidClubToADLS
68 changes: 42 additions & 26 deletions viadot/flows/transform_and_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

from prefect import Flow, task
from prefect.tasks.shell import ShellTask
from prefect.triggers import any_successful

from viadot.tasks import CloneRepo, AzureKeyVaultSecret, LumaIngest


@task
@task(trigger=any_successful)
def _cleanup_repo(dbt_repo_dir_name: str) -> None:
"""
Remove a repo folder.
Expand All @@ -20,13 +21,32 @@ def _cleanup_repo(dbt_repo_dir_name: str) -> None:
shutil.rmtree(dbt_repo_dir_name, ignore_errors=True) # Delete folder on run


class TransformAndCatalog(Flow):
@task(trigger=any_successful)
def custom_shell_task(name: str, command: str, helper_script: str = None) -> None:
"""
Task created to run ShellTask and apply `trigger` on it. For regular ShellTask it is not possible.
Args:
name (str): The name of the flow.
command (str): Shell command to run.
helper_script (str, optional): Path to local path repo. Defaults to None.
"""
ShellTask(
name=name,
command=command,
helper_script=helper_script,
return_all=True,
stream_output=True,
).run()


class TransformAndCatalogToLuma(Flow):
"""Build specified dbt model(s) and upload the generated metadata to Luma catalog."""

def __init__(
self,
name: str,
dbt_project_path: str,
dbt_project_path: str = "tmp_dbt_repo_dir",
dbt_repo_url: str = None,
dbt_repo_url_secret: str = None,
dbt_repo_branch: str = None,
Expand All @@ -35,7 +55,6 @@ def __init__(
local_dbt_repo_path: str = None,
dbt_selects: Dict[str, str] = None,
dbt_target: str = None,
stateful: bool = False,
metadata_dir_path: Union[str, Path] = None,
luma_url: str = "http://localhost",
luma_url_secret: str = None,
Expand All @@ -48,8 +67,8 @@ def __init__(
Args:
name (str): The name of the Flow.
dbt_project_path (str): The path to the dbt project (the directory containing
the `dbt_project.yml` file).
dbt_project_path (str, optional): The path to the dbt project (the directory containing
the `dbt_project.yml` file). Defaults to 'tmp_dbt_repo_dir'.
dbt_repo_url (str, optional): The URL for cloning the dbt repo with relevant dbt project. Defaults to None.
dbt_repo_url_secret (str, optional): Alternatively to above, the secret containing `dbt_repo_url`.
Defaults to None.
Expand All @@ -62,8 +81,6 @@ def __init__(
from run's, as long as run select is provided. Defaults to None.
dbt_target (str): The dbt target to use. If not specified, the default dbt target (as specified in `profiles.yaml`)
will be used. Defaults to None.
stateful (bool, optional): Whether only the models should be rebuilt only if modified.
See [dbt docs](https://docs.getdbt.com/guides/legacy/understanding-state). Defaults to False.
metadata_dir_path (Union[str, Path]): The path to the directory containing metadata files.
In the case of dbt, it's dbt project's `target` directory, which contains dbt artifacts
(`sources.json`, `catalog.json`, `manifest.json`, and `run_results.json`). Defaults to None.
Expand All @@ -78,19 +95,20 @@ def __init__(
# Build a single model
```python
import os
from viadot.flows import TransformAndCatalog
from viadot.flows import TransformAndCatalogToLuma
my_dbt_project_path = os.path.expanduser("~/dbt/my_dbt_project")
flow = TransformAndCatalog(
name="Transform and Catalog",
flow = TransformAndCatalogToLuma(
name="Transform and Catalog to Luma",
dbt_project_path=my_dbt_project_path,
dbt_repo_url=my_dbt_repo_url,
token=my_token,
dbt_selects={"run": "my_model",
"source_freshness": "source:schema.table",
"test": "my_model"},
metadata_dir_path="target",
dbt_selects={
"run": "my_model",
"source_freshness": "source:schema.table",
"test": "my_model"},
metadata_dir_path=f"{my_dbt_project_path}/target",
luma_url="http://localhost"
)
flow.run()
Expand All @@ -106,8 +124,6 @@ def __init__(
self.dbt_target = dbt_target
self.dbt_selects = dbt_selects

self.stateful = stateful

# CloneRepo
self.dbt_repo_url = dbt_repo_url
self.dbt_repo_url_secret = dbt_repo_url_secret
Expand Down Expand Up @@ -137,7 +153,7 @@ def gen_flow(self) -> Flow:
local_dbt_repo_path = (
os.path.expandvars(self.local_dbt_repo_path)
if self.local_dbt_repo_path is not None
else "tmp_dbt_repo_dir"
else f"{self.dbt_project_path}"
)

clone_repo = CloneRepo(url=dbt_repo_url)
Expand All @@ -159,15 +175,15 @@ def gen_flow(self) -> Flow:
dbt_clean_up = ShellTask(
name="dbt_task_clean",
command=f"dbt clean",
helper_script=f"cd {self.dbt_project_path}",
helper_script=f"cd {local_dbt_repo_path}",
return_all=True,
stream_output=True,
).bind(flow=self)

pull_dbt_deps = ShellTask(
name="dbt_task_deps",
command=f"dbt deps",
helper_script=f"cd {self.dbt_project_path}",
helper_script=f"cd {local_dbt_repo_path}",
return_all=True,
stream_output=True,
).bind(flow=self)
Expand All @@ -178,7 +194,7 @@ def gen_flow(self) -> Flow:
run = ShellTask(
name="dbt_task_run",
command=f"dbt run {run_select_safe} {dbt_target_option}",
helper_script=f"cd {self.dbt_project_path}",
helper_script=f"cd {local_dbt_repo_path}",
return_all=True,
stream_output=True,
).bind(flow=self)
Expand All @@ -189,20 +205,20 @@ def gen_flow(self) -> Flow:
test = ShellTask(
name="dbt_task_test",
command=f"dbt test {test_select_safe} {dbt_target_option}",
helper_script=f"cd {self.dbt_project_path}",
helper_script=f"cd {local_dbt_repo_path}",
return_all=True,
stream_output=True,
).bind(flow=self)

# Generate docs
# Produces `catalog.json`, `run-results.json`, and `manifest.json`
generate_catalog_json = ShellTask(

generate_catalog_json = custom_shell_task.bind(
name="dbt_task_docs_generate",
command=f"dbt docs generate {dbt_target_option} --no-compile",
helper_script=f"cd {self.dbt_project_path}",
return_all=True,
stream_output=True,
).bind(flow=self)
flow=self,
)

# Upload build metadata to Luma
path_expanded = os.path.expandvars(self.metadata_dir_path)
Expand Down
6 changes: 1 addition & 5 deletions viadot/tasks/luma.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import json

from prefect.tasks.shell import ShellTask

from .azure_key_vault import AzureKeyVaultSecret


Expand Down Expand Up @@ -41,9 +39,7 @@ def __init__(
self.helper_script = dbt_project_path
self.url = url
self.metadata_dir_path = metadata_dir_path
self.command = (
f"luma dbt ingest --luma-url {url} --metadata-dir {metadata_dir_path} "
)
self.command = f"luma dbt send-test-results --luma-url {url} --metadata-dir {metadata_dir_path}"
self.return_all = True
self.stream_output = True
self.log_stderr = True

0 comments on commit 39cfc03

Please sign in to comment.