Skip to content

Commit

Permalink
Allow multiple row-filters and add params
Browse files Browse the repository at this point in the history
  • Loading branch information
bogo96 committed May 16, 2022
1 parent e859d87 commit f8f741c
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
14 changes: 14 additions & 0 deletions clouddq/classes/dq_row_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations

from dataclasses import dataclass
from string import Template

from clouddq.utils import assert_not_none_or_empty

Expand Down Expand Up @@ -84,3 +85,16 @@ def dict_values(self: DqRowFilter) -> dict:
"""

return dict(self.to_dict().get(self.row_filter_id))


def resolve_sql_expr(self: DqRule, arguments: dict) -> None:
try:
self.filter_sql_expr = Template(self.filter_sql_expr).safe_substitute(
arguments
)
except Exception as e:
raise ValueError(
f"Failed to resolve row_filter_id '{self.row_filter_id}' in "
f"rule_binding_id '{self.rule_binding_id}' "
f"with error:\n{e}"
)
53 changes: 39 additions & 14 deletions clouddq/classes/dq_rule_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DqRuleBinding:
entity_id: str | None
entity_uri: EntityUri | None
column_id: str
row_filter_id: str
row_filter_ids: list
incremental_time_filter_column_id: str | None
rule_ids: list
metadata: dict | None
Expand Down Expand Up @@ -92,13 +92,14 @@ def from_dict(
)
if column_id:
column_id.upper()
row_filter_id: str = get_from_dict_and_assert(
row_filter_ids: list[str] = get_from_dict_and_assert(
config_id=rule_binding_id,
kwargs=kwargs,
key="row_filter_id",
key="row_filter_ids",
assertion=lambda x: type(x) == list,
error_msg=f"Rule Binding ID: '{rule_binding_id}' must have defined value "
f"'row_filter_ids' of type 'list'.",
)
if row_filter_id:
row_filter_id.upper()
rule_ids: list[str] = get_from_dict_and_assert(
config_id=rule_binding_id,
kwargs=kwargs,
Expand All @@ -124,7 +125,7 @@ def from_dict(
entity_id=entity_id,
entity_uri=entity_uri,
column_id=column_id,
row_filter_id=row_filter_id,
row_filter_ids=row_filter_ids,
incremental_time_filter_column_id=incremental_time_filter_column_id,
rule_ids=rule_ids,
metadata=metadata,
Expand All @@ -150,7 +151,7 @@ def to_dict(self: DqRuleBinding) -> dict:
"entity_id": self.entity_id,
"entity_uri": entity_uri,
"column_id": self.column_id,
"row_filter_id": self.row_filter_id,
"row_filter_ids": self.row_filter_ids,
"incremental_time_filter_column_id": self.incremental_time_filter_column_id, # noqa: E501
"rule_ids": self.rule_ids,
"metadata": self.metadata,
Expand Down Expand Up @@ -251,12 +252,35 @@ def resolve_rule_config_list(
)
return resolved_rule_config_list

def resolve_row_filter_config(
def resolve_row_filter_config_list(
self: DqRuleBinding,
configs_cache: dq_configs_cache.DqConfigsCache,
) -> DqRowFilter:
row_filter = configs_cache.get_row_filter_id(self.row_filter_id.upper())
return row_filter
) -> list[DqRowFilter]:
resolved_row_filter_config_list = []
for row_filter in self.row_filter_ids:
if type(row_filter) == dict:
if len(row_filter) > 1:
raise ValueError(
f"Rule Binding {self.rule_binding_id} has "
f"invalid configs in rule_ids. "
f"Each nested row_filter_id objects cannot "
f"have more than one rule_id. "
f"Current value: \n {row_filter}"
)
else:
row_filter_id = next(iter(row_filter))
arguments = row_filter[row_filter_id]
else:
row_filter_id = row_filter
arguments = None
row_filter_config = configs_cache.get_row_filter_id(row_filter_id.upper())
row_filter_config.resolve_sql_expr(arguments)
resolved_row_filter_config_list.append(row_filter_config)
assert_not_none_or_empty(
resolved_row_filter_config_list,
"Rule Binding must have non-empty row_filter list.",
)
return resolved_row_filter_config_list

def resolve_all_configs_to_dict(
self: DqRuleBinding,
Expand Down Expand Up @@ -315,7 +339,8 @@ def resolve_all_configs_to_dict(
rule_config["rule_sql_expr"] = rule_sql_expr
rule_configs_dict[rule_id] = rule_config
# Resolve filter configs
row_filter_config = self.resolve_row_filter_config(configs_cache)
row_filters_configs = {row_filter.row_filter_id: row_filter.dict_values()
for row_filter in self.resolve_row_filter_config_list(configs_cache)}
return dict(
{
"rule_binding_id": self.rule_binding_id,
Expand All @@ -325,8 +350,8 @@ def resolve_all_configs_to_dict(
"column_configs": dict(column_configs.dict_values()),
"rule_ids": list(self.rule_ids),
"rule_configs_dict": rule_configs_dict,
"row_filter_id": self.row_filter_id,
"row_filter_configs": dict(row_filter_config.dict_values()),
"row_filter_ids": self.row_filter_ids,
"row_filter_configs": row_filters_configs,
"incremental_time_filter_column": incremental_time_filter_column,
"metadata": self.metadata,
}
Expand Down
16 changes: 10 additions & 6 deletions clouddq/templates/dbt/macros/create_rule_binding_view.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
{%- macro create_rule_binding_view(configs, environment, dq_summary_table_name, metadata, configs_hashsum, progress_watermark, dq_summary_table_exists) -%}
{% set rule_binding_id = configs.get('rule_binding_id') -%}
{% set rule_configs_dict = configs.get('rule_configs_dict') -%}
{% set filter_sql_expr = configs.get('row_filter_configs').get('filter_sql_expr') -%}
{% set row_filter_configs = configs.get('row_filter_configs') -%}
{% set column_name = configs.get('column_configs').get('name') -%}
{% set entity_configs = configs.get('entity_configs') -%}
{% set partition_fields = entity_configs.get('partition_fields')-%}
Expand Down Expand Up @@ -72,16 +72,20 @@ data AS (
CAST(d.{{ time_column_id }} AS TIMESTAMP)
> high_watermark_filter.high_watermark
AND
{{ filter_sql_expr }}
{% else %}
{%- else %}
WHERE
{{ filter_sql_expr }}
{% endif -%}
{%- endif %}
{%- for id, row_filter_config in row_filter_configs.items() %}
{{ row_filter_config.get('filter_sql_expr') -}}
{%- if loop.nextitem is defined %}
AND
{%- endif -%}
{%- endfor %}
{%- if partition_fields %}
{% for field in partition_fields %}
AND {{ field['name'] }} IS NOT NULL
{%- endfor -%}
{% endif -%}
{%- endif -%}
),
last_mod AS (
SELECT
Expand Down

0 comments on commit f8f741c

Please sign in to comment.