Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
feat(dbt-fal): make fal-scripts-path value available as a path to imp…
Browse files Browse the repository at this point in the history
…ort for models (#645)

* add local import to integration tests

* feat(dbt-fal): make fal-scripts-path value available as a path to import for models

* Change default fal-scripts-path value

* use project_root
  • Loading branch information
chamini2 authored Nov 10, 2022
1 parent 905ab44 commit 0c500b9
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def get_bool() -> bool:
print("getting the bool")
return False
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ def model(dbt, fal):
dbt.config(materialized="table")
dbt.config(fal_environment="funny")
import pyjokes
from utils.get_bool import get_bool

joke = pyjokes.get_joke()
df = dbt.ref("model_a")

df["my_joke"] = joke
df["my_bool"] = get_bool()
return df
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ target-path: "{{ env_var('temp_dir') }}/target"

models:
+schema: custom

vars:
fal-scripts-path: "utils"
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
def model(dbt, fal):
from get_bool import get_bool

dbt.config(materialized="table")
df = dbt.ref("model_b")

df["my_bool"] = True
df["my_bool"] = get_bool()
return df
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def get_bool() -> bool:
print("getting the bool")
return False
46 changes: 38 additions & 8 deletions adapter/src/dbt/adapters/fal_experimental/adapter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from contextlib import contextmanager
from functools import partial
from typing import Any

Expand All @@ -22,21 +23,25 @@

from .utils import retrieve_symbol

def run_with_adapter(code: str, adapter: BaseAdapter) -> Any:
FAL_SCRIPTS_PATH_VAR_NAME = 'fal-scripts-path'

def run_with_adapter(code: str, adapter: BaseAdapter, config: RuntimeConfig) -> Any:
# main symbol is defined during dbt-fal's compilation
# and acts as an entrypoint for us to run the model.
main = retrieve_symbol(code, "main")
return main(
read_df=prepare_for_adapter(adapter, read_relation_as_df),
write_df=prepare_for_adapter(adapter, write_df_to_relation),
)
fal_scripts_path = str(_get_fal_scripts_path(config))
with extra_path(fal_scripts_path):
main = retrieve_symbol(code, "main")
return main(
read_df=prepare_for_adapter(adapter, read_relation_as_df),
write_df=prepare_for_adapter(adapter, write_df_to_relation),
)

def _isolated_runner(code: str, config: RuntimeConfig, manifest: Manifest, macro_manifest: MacroManifest) -> Any:
# This function can be run in an entirely separate
# process or an environment, so we need to reconstruct
# the DB adapter solely from the config.
adapter = reconstruct_adapter(config, manifest, macro_manifest)
return run_with_adapter(code, adapter)
return run_with_adapter(code, adapter, config)

def run_in_environment_with_adapter(
environment: BaseEnvironment,
Expand Down Expand Up @@ -71,8 +76,33 @@ def run_in_environment_with_adapter(
else:
deps = get_default_pip_dependencies()
stage = VirtualPythonEnvironment(deps)
fal_scripts_path = _get_fal_scripts_path(config)

with PythonIPC(environment, environment.create(), extra_inheritance_paths=[stage.create()]) as connection:
with PythonIPC(environment, environment.create(), extra_inheritance_paths=[fal_scripts_path, stage.create()]) as connection:
execute_model = partial(_isolated_runner, code, config, manifest, macro_manifest)
result = connection.run(execute_model)
return result

@contextmanager
def extra_path(path: str):
import sys
sys.path.append(path)
try:
yield
finally:
sys.path.remove(path)

def _get_fal_scripts_path(config: RuntimeConfig):
import pathlib
project_path = pathlib.Path(config.project_root)

# Default value
fal_scripts_path = 'fal_scripts'

if hasattr(config, 'vars'):
fal_scripts_path: str = config.vars.to_dict().get(FAL_SCRIPTS_PATH_VAR_NAME, fal_scripts_path) # type: ignore

if hasattr(config, 'cli_vars'):
fal_scripts_path = config.cli_vars.get(FAL_SCRIPTS_PATH_VAR_NAME, fal_scripts_path)

return project_path / fal_scripts_path
2 changes: 1 addition & 1 deletion adapter/src/dbt/adapters/fal_experimental/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def submit_python_job(

else:
if is_local:
return run_with_adapter(compiled_code, self._db_adapter)
return run_with_adapter(compiled_code, self._db_adapter, self.config)

with self._invalidate_db_cache():
return run_in_environment_with_adapter(
Expand Down

0 comments on commit 0c500b9

Please sign in to comment.