diff --git a/clouddq/classes/dq_row_filter.py b/clouddq/classes/dq_row_filter.py index 3d493d8e..351784fa 100644 --- a/clouddq/classes/dq_row_filter.py +++ b/clouddq/classes/dq_row_filter.py @@ -16,6 +16,7 @@ from __future__ import annotations from dataclasses import dataclass +from string import Template from clouddq.utils import assert_not_none_or_empty @@ -84,3 +85,16 @@ def dict_values(self: DqRowFilter) -> dict: """ return dict(self.to_dict().get(self.row_filter_id)) + + + def resolve_sql_expr(self: DqRule, arguments: dict) -> None: + try: + self.filter_sql_expr = Template(self.filter_sql_expr).safe_substitute( + arguments + ) + except Exception as e: + raise ValueError( + f"Failed to resolve row_filter_id '{self.row_filter_id}' in " + f"rule_binding_id '{self.rule_binding_id}' " + f"with error:\n{e}" + ) diff --git a/clouddq/classes/dq_rule_binding.py b/clouddq/classes/dq_rule_binding.py index 6ec11813..48e83c2c 100644 --- a/clouddq/classes/dq_rule_binding.py +++ b/clouddq/classes/dq_rule_binding.py @@ -46,7 +46,7 @@ class DqRuleBinding: entity_id: str | None entity_uri: EntityUri | None column_id: str - row_filter_id: str + row_filter_ids: list incremental_time_filter_column_id: str | None rule_ids: list reference_columns_id: str | None @@ -95,13 +95,22 @@ def from_dict( ) if column_id: column_id.upper() - row_filter_id: str = get_from_dict_and_assert( + row_filter_config: dict = get_keys_from_dict_and_assert_oneof( config_id=rule_binding_id, kwargs=kwargs, - key="row_filter_id", + keys=["row_filter_id", "row_filter_ids"] ) - if row_filter_id: - row_filter_id.upper() + row_filter_ids = [] + if "row_filter_id" in row_filter_config: + row_filter_ids.append(row_filter_config["row_filter_id"].upper()) + if "row_filter_ids" in row_filter_config: + for row_filter in row_filter_config["row_filter_ids"]: + if type(row_filter) == str: + row_filter_ids.append(row_filter.upper()) + if type(row_filter) == dict: + row_filter_ids.extend( + [id.upper() for id in row_filter] + ) rule_ids: list[str] = get_from_dict_and_assert( config_id=rule_binding_id, kwargs=kwargs, @@ -130,7 +139,7 @@ def from_dict( entity_id=entity_id, entity_uri=entity_uri, column_id=column_id, - row_filter_id=row_filter_id, + row_filter_ids=row_filter_ids, incremental_time_filter_column_id=incremental_time_filter_column_id, rule_ids=rule_ids, reference_columns_id=reference_columns_id, @@ -157,7 +166,7 @@ def to_dict(self: DqRuleBinding) -> dict: "entity_id": self.entity_id, "entity_uri": entity_uri, "column_id": self.column_id, - "row_filter_id": self.row_filter_id, + "row_filter_ids": self.row_filter_ids, "incremental_time_filter_column_id": self.incremental_time_filter_column_id, # noqa: E501 "rule_ids": self.rule_ids, "reference_columns_id": self.reference_columns_id, @@ -259,12 +268,35 @@ def resolve_rule_config_list( ) return resolved_rule_config_list - def resolve_row_filter_config( + def resolve_row_filter_config_list( self: DqRuleBinding, configs_cache: dq_configs_cache.DqConfigsCache, - ) -> DqRowFilter: - row_filter = configs_cache.get_row_filter_id(self.row_filter_id.upper()) - return row_filter + ) -> list[DqRowFilter]: + resolved_row_filter_config_list = [] + for row_filter in self.row_filter_ids: + if type(row_filter) == dict: + if len(row_filter) > 1: + raise ValueError( + f"Rule Binding {self.rule_binding_id} has " + f"invalid configs in row_filter_ids. " + f"Each nested row_filter_id objects cannot " + f"have more than one row_filter_id. " + f"Current value: \n {row_filter}" + ) + else: + row_filter_id = next(iter(row_filter)) + arguments = row_filter[row_filter_id] + else: + row_filter_id = row_filter + arguments = None + row_filter_config = configs_cache.get_row_filter_id(row_filter_id) + row_filter_config.resolve_sql_expr(arguments) + resolved_row_filter_config_list.append(row_filter_config) + assert_not_none_or_empty( + resolved_row_filter_config_list, + "Rule Binding must have non-empty row_filter list.", + ) + return resolved_row_filter_config_list def resolve_reference_columns_config( self: DqRuleBinding, @@ -333,7 +365,8 @@ def resolve_all_configs_to_dict( rule_config["rule_sql_expr"] = rule_sql_expr rule_configs_dict[rule_id] = rule_config # Resolve filter configs - row_filter_config = self.resolve_row_filter_config(configs_cache) + row_filters_configs = {row_filter.row_filter_id: row_filter.dict_values() + for row_filter in self.resolve_row_filter_config_list(configs_cache)} # resolve reference columns config if self.reference_columns_id: include_all_reference_columns = False @@ -370,8 +403,8 @@ def resolve_all_configs_to_dict( "column_configs": dict(column_configs.dict_values()), "rule_ids": list(self.rule_ids), "rule_configs_dict": rule_configs_dict, - "row_filter_id": self.row_filter_id, - "row_filter_configs": dict(row_filter_config.dict_values()), + "row_filter_ids": self.row_filter_ids, + "row_filter_configs": row_filters_configs, "incremental_time_filter_column": incremental_time_filter_column, "metadata": self.metadata, } diff --git a/clouddq/templates/dbt/macros/create_rule_binding_view.sql b/clouddq/templates/dbt/macros/create_rule_binding_view.sql index bb89dba9..8e9633d0 100644 --- a/clouddq/templates/dbt/macros/create_rule_binding_view.sql +++ b/clouddq/templates/dbt/macros/create_rule_binding_view.sql @@ -16,7 +16,7 @@ {%- macro create_rule_binding_view(configs, environment, dq_summary_table_name, metadata, configs_hashsum, progress_watermark, dq_summary_table_exists, high_watermark_value, current_timestamp_value) -%} {% set rule_binding_id = configs.get('rule_binding_id') -%} {% set rule_configs_dict = configs.get('rule_configs_dict') -%} -{% set filter_sql_expr = configs.get('row_filter_configs').get('filter_sql_expr') -%} +{% set row_filter_configs = configs.get('row_filter_configs') -%} {% set column_name = configs.get('column_configs').get('name') -%} {% set entity_configs = configs.get('entity_configs') -%} {% set partition_fields = entity_configs.get('partition_fields')-%} @@ -64,16 +64,20 @@ data AS ( CAST(d.{{ time_column_id }} AS TIMESTAMP) BETWEEN CAST('{{ high_watermark_value }}' AS TIMESTAMP) AND CAST('{{ current_timestamp_value }}' AS TIMESTAMP) AND - {{ filter_sql_expr }} -{% else %} +{%- else %} WHERE - {{ filter_sql_expr }} -{% endif -%} +{%- endif %} + {%- for id, row_filter_config in row_filter_configs.items() %} + {{ row_filter_config.get('filter_sql_expr') -}} + {%- if loop.nextitem is defined %} + AND + {%- endif -%} + {%- endfor %} {%- if partition_fields %} {% for field in partition_fields %} AND {{ field['name'] }} IS NOT NULL {%- endfor -%} -{% endif -%} +{%- endif -%} ), last_mod AS ( SELECT diff --git a/configs/row_filters/row-filters.yml b/configs/row_filters/row-filters.yml index d64b3c25..45a044cf 100644 --- a/configs/row_filters/row-filters.yml +++ b/configs/row_filters/row-filters.yml @@ -18,5 +18,7 @@ row_filters: True DATA_TYPE_EMAIL: + params: |- + column filter_sql_expr: |- - contact_type = 'email' + $column = 'email' diff --git a/configs/rule_bindings/team-1-rule-bindings.yml b/configs/rule_bindings/team-1-rule-bindings.yml index a1b530cd..191a8b4d 100644 --- a/configs/rule_bindings/team-1-rule-bindings.yml +++ b/configs/rule_bindings/team-1-rule-bindings.yml @@ -16,7 +16,8 @@ rule_bindings: T1_DQ_1_VALUE_NOT_NULL: entity_id: TEST_TABLE column_id: VALUE - row_filter_id: NONE + row_filter_ids: + - NONE reference_columns_id: CONTACT_DETAILS_REFERENCE_COLUMNS rule_ids: - NOT_NULL_SIMPLE diff --git a/configs/rule_bindings/team-2-rule-bindings.yml b/configs/rule_bindings/team-2-rule-bindings.yml index 29b50e6d..bcbf39a4 100644 --- a/configs/rule_bindings/team-2-rule-bindings.yml +++ b/configs/rule_bindings/team-2-rule-bindings.yml @@ -16,7 +16,9 @@ rule_bindings: T2_DQ_1_EMAIL: entity_id: TEST_TABLE column_id: VALUE - row_filter_id: DATA_TYPE_EMAIL + row_filter_ids: + - DATA_TYPE_EMAIL: + column: "contact_type" reference_columns_id: CONTACT_DETAILS_REFERENCE_COLUMNS rule_ids: - NOT_NULL_SIMPLE diff --git a/configs/rule_bindings/team-3-rule-bindings.yml b/configs/rule_bindings/team-3-rule-bindings.yml index 635c85e8..4b81abc6 100644 --- a/configs/rule_bindings/team-3-rule-bindings.yml +++ b/configs/rule_bindings/team-3-rule-bindings.yml @@ -16,7 +16,9 @@ rule_bindings: T3_DQ_1_EMAIL_DUPLICATE: entity_id: TEST_TABLE column_id: VALUE - row_filter_id: DATA_TYPE_EMAIL + row_filter_ids: + - DATA_TYPE_EMAIL: + column: "contact_type" reference_columns_id: CONTACT_DETAILS_REFERENCE_COLUMNS # incremental_time_filter_column_id: TS rule_ids: