diff --git a/CHANGELOG.md b/CHANGELOG.md index cbdc71e..90d140b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,4 +37,4 @@ Support `--select` for the `DbtSnapshotOperator`. # v0.4.0 -Add the `DbtDocsGenerateOperator` and `DbtDepsOperator`. +Add the `DbtDocsGenerateOperator` and `DbtDepsOperator`. \ No newline at end of file diff --git a/README.md b/README.md index 3ae4f05..0d23e6a 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,8 @@ There are five operators currently implemented: Each of the above operators accept the following arguments: +* `env` + * If set as a kwarg dict, passed the given environment variables as the arguments to the dbt task * `profiles_dir` * If set, passed as the `--profiles-dir` argument to the `dbt` command * `target` @@ -164,6 +166,41 @@ dbt_run = DbtRunOperator( ) ``` +## Templating and parsing environments variables + +If you would like to run DBT using custom profile definition template with environment-specific variables, like for example profiles.yml using jinja: +```yaml +: + outputs: + : + database: "{{ env_var('DBT_ENV_SECRET_DATABASE') }}" + password: "{{ env_var('DBT_ENV_SECRET_PASSWORD') }}" + schema: "{{ env_var('DBT_ENV_SECRET_SCHEMA') }}" + threads: "{{ env_var('DBT_THREADS') }}" + type: + user: "{{ env_var('USER_NAME') }}_{{ env_var('ENV_NAME') }}" + target: +``` + +You can pass the environment variables via the `env` kwarg parameter: + +```python +import os +... + +dbt_run = DbtRunOperator( + task_id='dbt_run', + env={ + 'DBT_ENV_SECRET_DATABASE': '', + 'DBT_ENV_SECRET_PASSWORD': '', + 'DBT_ENV_SECRET_SCHEMA': '', + 'USER_NAME': '', + 'DBT_THREADS': os.getenv(''), + 'ENV_NAME': os.getenv('ENV_NAME') + } +) +``` + ## License & Contributing * This is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT). diff --git a/airflow_dbt/hooks/dbt_hook.py b/airflow_dbt/hooks/dbt_hook.py index 875d20f..a16e53f 100644 --- a/airflow_dbt/hooks/dbt_hook.py +++ b/airflow_dbt/hooks/dbt_hook.py @@ -11,6 +11,8 @@ class DbtCliHook(BaseHook): """ Simple wrapper around the dbt CLI. + :param env: If set, passes the env variables to the subprocess handler + :type env: dict :param profiles_dir: If set, passed as the `--profiles-dir` argument to the `dbt` command :type profiles_dir: str :param target: If set, passed as the `--target` argument to the `dbt` command @@ -40,6 +42,7 @@ class DbtCliHook(BaseHook): """ def __init__(self, + env=None, profiles_dir=None, target=None, dir='.', @@ -55,6 +58,7 @@ def __init__(self, output_encoding='utf-8', verbose=True, warn_error=False): + self.env = env or {} self.profiles_dir = profiles_dir self.dir = dir self.target = target @@ -125,6 +129,7 @@ def run_cli(self, *command): sp = subprocess.Popen( dbt_cmd, + env=self.env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self.dir, diff --git a/airflow_dbt/operators/dbt_operator.py b/airflow_dbt/operators/dbt_operator.py index 69ba953..5f8d632 100644 --- a/airflow_dbt/operators/dbt_operator.py +++ b/airflow_dbt/operators/dbt_operator.py @@ -8,6 +8,8 @@ class DbtBaseOperator(BaseOperator): Base dbt operator All other dbt operators are derived from this operator. + :param env: If set, passes the env variables to the subprocess handler + :type env: dict :param profiles_dir: If set, passed as the `--profiles-dir` argument to the `dbt` command :type profiles_dir: str :param target: If set, passed as the `--target` argument to the `dbt` command @@ -36,10 +38,11 @@ class DbtBaseOperator(BaseOperator): ui_color = '#d6522a' - template_fields = ['vars'] + template_fields = ['env', 'vars'] @apply_defaults def __init__(self, + env=None, profiles_dir=None, target=None, dir='.', @@ -58,6 +61,7 @@ def __init__(self, **kwargs): super(DbtBaseOperator, self).__init__(*args, **kwargs) + self.env = env or {} self.profiles_dir = profiles_dir self.target = target self.dir = dir @@ -76,6 +80,7 @@ def __init__(self, def create_hook(self): self.hook = DbtCliHook( + env=self.env, profiles_dir=self.profiles_dir, target=self.target, dir=self.dir, diff --git a/tests/__init__.py b/tests/__init__.py old mode 100644 new mode 100755 diff --git a/tests/hooks/test_dbt_hook.py b/tests/hooks/test_dbt_hook.py index 4dd39ed..383a953 100644 --- a/tests/hooks/test_dbt_hook.py +++ b/tests/hooks/test_dbt_hook.py @@ -23,6 +23,7 @@ def test_sub_commands(self, mock_subproc_popen): 'docs', 'generate' ], + env={}, close_fds=True, cwd='.', stdout=subprocess.PIPE, @@ -47,8 +48,34 @@ def test_vars(self, mock_subproc_popen): '--vars', '{"foo": "bar", "baz": "true"}' ], + env={}, close_fds=True, cwd='.', stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) + + @mock.patch('subprocess.Popen') + def test_envs(self, mock_subproc_popen): + mock_subproc_popen.return_value \ + .communicate.return_value = ('output', 'error') + mock_subproc_popen.return_value.returncode = 0 + mock_subproc_popen.return_value \ + .stdout.readline.side_effect = [b"placeholder"] + + hook = DbtCliHook(vars={"foo": "bar", "baz": "true"}, env={"foo": "bar", "baz": "true"}) + hook.run_cli('run') + + mock_subproc_popen.assert_called_once_with( + [ + 'dbt', + 'run', + '--vars', + '{"foo": "bar", "baz": "true"}' + ], + env={"foo": "bar", "baz": "true"}, + close_fds=True, + cwd='.', + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + )