Skip to content

Commit

Permalink
adapter function for freshness via custom sql
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenyuLInx committed Dec 17, 2024
1 parent e3964d7 commit 32dc0ea
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 36 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241216-172047.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add function to run custom sql for getting freshness info
time: 2024-12-16T17:20:47.065611-08:00
custom:
Author: ChenyuLInx
Issue: "8797"
77 changes: 41 additions & 36 deletions dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
FRESHNESS_MACRO_NAME = "collect_freshness"
CUSTOME_SQL_FRESHNESS_MACRO_NAME = "collect_freshness_custom_sql"
GET_RELATION_LAST_MODIFIED_MACRO_NAME = "get_relation_last_modified"
DEFAULT_BASE_BEHAVIOR_FLAGS = [
{
Expand Down Expand Up @@ -1327,6 +1328,31 @@ def cancel_open_connections(self):
"""Cancel all open connections."""
return self.connections.cancel_open()

def _process_freshness_execution(
self,
macro_name: str,
kwargs: Dict[str, Any],
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Execute and process a freshness macro to generate a FreshnessResponse"""
import agate

result = self.execute_macro(macro_name, kwargs=kwargs, macro_resolver=macro_resolver)

if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
table = result
adapter_response = None
else:
adapter_response, table = result.response, result.table

# Process the results table
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(macro_name, table)

freshness_response = self._create_freshness_response(table[0][0], table[0][1])
return adapter_response, freshness_response

def calculate_freshness(
self,
source: BaseRelation,
Expand All @@ -1335,49 +1361,28 @@ def calculate_freshness(
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
"""Calculate the freshness of sources in dbt, and return it"""
import agate

kwargs: Dict[str, Any] = {
kwargs = {
"source": source,
"loaded_at_field": loaded_at_field,
"filter": filter,
}

# run the macro
# in older versions of dbt-core, the 'collect_freshness' macro returned the table of results directly
# starting in v1.5, by default, we return both the table and the adapter response (metadata about the query)
result: Union[
AttrDict, # current: contains AdapterResponse + "agate.Table"
"agate.Table", # previous: just table
]
result = self.execute_macro(
FRESHNESS_MACRO_NAME, kwargs=kwargs, macro_resolver=macro_resolver
return self._process_freshness_execution(
FRESHNESS_MACRO_NAME, kwargs, macro_resolver
)
if isinstance(result, agate.Table):
warn_or_error(CollectFreshnessReturnSignature())
adapter_response = None
table = result
else:
adapter_response, table = result.response, result.table # type: ignore[attr-defined]
# now we have a 1-row table of the maximum `loaded_at_field` value and
# the current time according to the db.
if len(table) != 1 or len(table[0]) != 2:
raise MacroResultError(FRESHNESS_MACRO_NAME, table)
if table[0][0] is None:
# no records in the table, so really the max_loaded_at was
# infinitely long ago. Just call it 0:00 January 1 year UTC
max_loaded_at = datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.UTC)
else:
max_loaded_at = _utc(table[0][0], source, loaded_at_field)

snapshotted_at = _utc(table[0][1], source, loaded_at_field)
age = (snapshotted_at - max_loaded_at).total_seconds()
freshness: FreshnessResponse = {
"max_loaded_at": max_loaded_at,
"snapshotted_at": snapshotted_at,
"age": age,
def calculate_freshness_from_custom_sql(
self,
source: BaseRelation,
sql: str,
macro_resolver: Optional[MacroResolverProtocol] = None,
) -> Tuple[Optional[AdapterResponse], FreshnessResponse]:
kwargs = {
"source": source,
"loaded_at_query": sql,
}
return adapter_response, freshness
return self._process_freshness_execution(
CUSTOME_SQL_FRESHNESS_MACRO_NAME, kwargs, macro_resolver
)

def calculate_freshness_from_metadata_batch(
self,
Expand Down
16 changes: 16 additions & 0 deletions dbt/include/global_project/macros/adapters/freshness.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@
{% endcall %}
{{ return(load_result('collect_freshness')) }}
{% endmacro %}

{% macro collect_freshness_custom_sql(source, loaded_at_query) %}
{{ return(adapter.dispatch('collect_freshness_custom_sql', 'dbt')(source, loaded_at_query))}}
{% endmacro %}

{% macro default__collect_freshness_custom_sql(source, loaded_at_query) %}
{% call statement('collect_freshness_custom_sql', fetch_result=True, auto_begin=False) -%}
with source_query as (
{{ loaded_at_query }}
)
select
(select * from source_query) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
{% endcall %}
{{ return(load_result('collect_freshness_custom_sql')) }}
{% endmacro %}

0 comments on commit 32dc0ea

Please sign in to comment.