Skip to content

Commit

Permalink
Adding
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachin-Thakur committed Dec 1, 2023
1 parent ade7833 commit 25d922c
Show file tree
Hide file tree
Showing 3 changed files with 293 additions and 1 deletion.
33 changes: 32 additions & 1 deletion dbt/include/vertica/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@
{% endmacro %}







{% macro vertica__information_schema_name(database) -%}
{%- if database -%}
{{ adapter.quote_as_configured(database, 'database') }}.v_catalog
Expand Down Expand Up @@ -96,4 +101,30 @@
where table_schema ilike '{{ schema_relation.schema }}'
{% endcall %}
{{ return(load_result('list_relations_without_caching').table) }}
{% endmacro %}
{% endmacro %}



{% macro vertica__get_relation_last_modified(information_schema, relations) -%}

{%- call statement('last_modified', fetch_result=True) -%}
select table_schema as schema,
table_name as identifier,
create_time as last_modified,
{{ current_timestamp() }} as snapshotted_at
from v_catalog.tables
where (
{%- for relation in relations -%}
(upper(table_schema) = upper('{{ relation.schema }}') and
upper(table_name) = upper('{{ relation.identifier }}')){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endcall -%}

{{ return(load_result('last_modified')) }}

{% endmacro %}




61 changes: 61 additions & 0 deletions tests/functional/adapter/test_get_last_relation_modified.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
import pytest

from dbt.cli.main import dbtRunner


freshness_via_metadata_schema_yml = """version: 2
sources:
- name: test_source
freshness:
warn_after: {count: 10, period: hour}
error_after: {count: 1, period: day}
schema: "{{ env_var('DBT_GET_LAST_RELATION_TEST_SCHEMA') }}"
tables:
- name: test_table
"""


class TestGetLastRelationModified:
@pytest.fixture(scope="class", autouse=True)
def set_env_vars(self, project):
os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"] = project.test_schema
yield
del os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]

@pytest.fixture(scope="class")
def models(self):
return {"schema.yml": freshness_via_metadata_schema_yml}

@pytest.fixture(scope="class")
def custom_schema(self, project, set_env_vars):
with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database, schema=os.environ["DBT_GET_LAST_RELATION_TEST_SCHEMA"]
)
project.adapter.drop_schema(relation)
project.adapter.create_schema(relation)

yield relation.schema

with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

def test_get_last_relation_modified(self, project, set_env_vars, custom_schema):
project.run_sql(
f"create table {custom_schema}.test_table (id integer , name varchar(100) not null);"
)

warning_or_error = False

def probe(e):
nonlocal warning_or_error
if e.info.level in ["warning", "error"]:
warning_or_error = True

runner = dbtRunner(callbacks=[probe])
runner.invoke(["source", "freshness"])
print("work",warning_or_error)
# warning_or_error = False
# The 'source freshness' command should succeed without warnings or errors.
assert not warning_or_error
200 changes: 200 additions & 0 deletions tests/functional/adapter/test_list_relations_without_caching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import pytest

import json
from dbt.tests.util import run_dbt, run_dbt_and_capture

# Testing rationale:
# - vertica SHOW TERSE OBJECTS command returns at max 10K objects in a single call
# - when dbt attempts to write into a scehma with more than 10K objects, compilation will fail
# unless we paginate the result
# - however, testing this process is difficult at a full scale of 10K actual objects populated
# into a fresh testing schema
# - accordingly, we create a smaller set of views and test the looping iteration logic in
# smaller chunks

NUM_VIEWS = 100
NUM_EXPECTED_RELATIONS = 1 + NUM_VIEWS

TABLE_BASE_SQL = """
{{ config(materialized='table') }}
select 1 as id
""".lstrip()

VIEW_X_SQL = """
select id from {{ ref('my_model_base') }}
""".lstrip()

MACROS__VALIDATE__VERTICA__LIST_RELATIONS_WITHOUT_CACHING = """
{% macro validate_list_relations_without_caching(schema_relation) %}
{% set relation_list_result = vertica__list_relations_without_caching(schema_relation, max_iter=11, max_results_per_iter=10) %}
{% set n_relations = relation_list_result | length %}
{{ log("n_relations: " ~ n_relations) }}
{% endmacro %}
"""

MACROS__VALIDATE__VERTICA__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR = """
{% macro validate_list_relations_without_caching_raise_error(schema_relation) %}
{{ vertica__list_relations_without_caching(schema_relation, max_iter=33, max_results_per_iter=3) }}
{% endmacro %}
"""


def parse_json_logs(json_log_output):
parsed_logs = []
for line in json_log_output.split("\n"):
try:
log = json.loads(line)
except ValueError:
continue

parsed_logs.append(log)

return parsed_logs


def find_result_in_parsed_logs(parsed_logs, result_name):
return next(
(
item["data"]["msg"]
for item in parsed_logs
if result_name in item["data"].get("msg", "msg")
),
False,
)


def find_exc_info_in_parsed_logs(parsed_logs, exc_info_name):
return next(
(
item["data"]["exc_info"]
for item in parsed_logs
if exc_info_name in item["data"].get("exc_info", "exc_info")
),
False,
)


class TestListRelationsWithoutCachingSingle:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

return my_models

@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__VERTICA__LIST_RELATIONS_WITHOUT_CACHING,
}

def test__vertica__list_relations_without_caching_termination(self, project):
"""
validates that we do NOT trigger pagination logic vertica__list_relations_without_caching
macro when there are fewer than max_results_per_iter relations in the target schema
"""
run_dbt(["run", "-s", "my_model_base"])

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = f"{database}.{schema}"
kwargs = {"schema_relation": schema_relation}
print("checks",kwargs)
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching",
"--args",
str(kwargs),
]
)

parsed_logs = parse_json_logs(log_output)
print('w',log_output)
n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations")
print("checks",n_relations)
assert n_relations == "n_relations: 1"


class TestListRelationsWithoutCachingFull:
@pytest.fixture(scope="class")
def models(self):
my_models = {"my_model_base.sql": TABLE_BASE_SQL}
for view in range(0, NUM_VIEWS):
my_models.update({f"my_model_{view}.sql": VIEW_X_SQL})

return my_models

@pytest.fixture(scope="class")
def macros(self):
return {
"validate_list_relations_without_caching.sql": MACROS__VALIDATE__VERTICA__LIST_RELATIONS_WITHOUT_CACHING,
"validate_list_relations_without_caching_raise_error.sql": MACROS__VALIDATE__VERTICA__LIST_RELATIONS_WITHOUT_CACHING_RAISE_ERROR,
}

def test__vertica__list_relations_without_caching(self, project):
"""
validates pagination logic in vertica__list_relations_without_caching macro counts
the correct number of objects in the target schema when having to make multiple looped
calls of SHOW TERSE OBJECTS.
"""
# purpose of the first run is to create the replicated views in the target schema
run_dbt(["run"])

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = f"{database}.{schema}"
kwargs = {"schema_relation": schema_relation}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching",
"--args",
str(kwargs),
]
)

parsed_logs = parse_json_logs(log_output)
n_relations = find_result_in_parsed_logs(parsed_logs, "n_relations")

assert n_relations == f"n_relations: {NUM_EXPECTED_RELATIONS}"

def test__vertica__list_relations_without_caching_raise_error(self, project):
"""
validates pagination logic terminates and raises a compilation error
when exceeding the limit of how many results to return.
"""
run_dbt(["run"])

database = project.database
schemas = project.created_schemas

for schema in schemas:
schema_relation = f"{database}.{schema}"
kwargs = {"schema_relation": schema_relation}
_, log_output = run_dbt_and_capture(
[
"--debug",
"--log-format=json",
"run-operation",
"validate_list_relations_without_caching_raise_error",
"--args",
str(kwargs),
],
expect_pass=False,
)

parsed_logs = parse_json_logs(log_output)
traceback = find_exc_info_in_parsed_logs(parsed_logs, "Traceback")
assert "dbt will list a maximum of 99 objects in schema " in traceback

0 comments on commit 25d922c

Please sign in to comment.